Skip to content

Commit

Permalink
feat: append messageIds to standard and timeout errors
Browse files Browse the repository at this point in the history
  • Loading branch information
nicholasgriffintn committed Jan 28, 2025
1 parent 047e426 commit 13eceb2
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 2 deletions.
3 changes: 3 additions & 0 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -549,12 +549,14 @@ export class Consumer extends TypedEventEmitter {
throw toTimeoutError(
err,
`Message handler timed out after ${this.handleMessageTimeout}ms: Operation timed out.`,
message
);
}
if (err instanceof Error) {
throw toStandardError(
err,
`Unexpected message handler failure: ${err.message}`,
message
);
}
throw err;
Expand All @@ -581,6 +583,7 @@ export class Consumer extends TypedEventEmitter {
throw toStandardError(
err,
`Unexpected message handler failure: ${err.message}`,
messages,
);
}
throw err;
Expand Down
33 changes: 31 additions & 2 deletions src/errors.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { Message } from "@aws-sdk/client-sqs";

import { AWSError } from "./types.js";

class SQSError extends Error {
Expand All @@ -17,24 +19,28 @@ class SQSError extends Error {
}

class TimeoutError extends Error {
messageIds: string[];
cause: Error;
time: Date;

constructor(message = "Operation timed out.") {
super(message);
this.message = message;
this.name = "TimeoutError";
this.messageIds = [];
}
}

class StandardError extends Error {
messageIds: string[];
cause: Error;
time: Date;

constructor(message = "An unexpected error occurred:") {
super(message);
this.message = message;
this.name = "StandardError";
this.messageIds = [];
}
}

Expand Down Expand Up @@ -90,15 +96,32 @@ function toSQSError(
return sqsError;
}

/**
* Gets the message IDs from the message.
* @param message The message that was received from SQS.
*/
function getMessageIds(message: Message | Message[]): string[] {
if (Array.isArray(message)) {
return message.map((m) => m.MessageId);
}
return [message.MessageId];
}

/**
* Formats an Error to the StandardError type.
* @param err The error object that was received.
* @param message The message to send with the error.
* @param sqsMessage The message that was received from SQS.
*/
function toStandardError(err: Error, message: string): StandardError {
function toStandardError(
err: Error,
message: string,
sqsMessage: Message | Message[],
): StandardError {
const error = new StandardError(message);
error.cause = err;
error.time = new Date();
error.messageIds = getMessageIds(sqsMessage);

return error;
}
Expand All @@ -107,11 +130,17 @@ function toStandardError(err: Error, message: string): StandardError {
* Formats an Error to the TimeoutError type.
* @param err The error object that was received.
* @param message The message to send with the error.
* @param sqsMessage The message that was received from SQS.
*/
function toTimeoutError(err: TimeoutError, message: string): TimeoutError {
function toTimeoutError(
err: TimeoutError,
message: string,
sqsMessage: Message | Message[],
): TimeoutError {
const error = new TimeoutError(message);
error.cause = err;
error.time = new Date();
error.messageIds = getMessageIds(sqsMessage);

return error;
}
Expand Down
62 changes: 62 additions & 0 deletions test/tests/consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ describe("Consumer", () => {
"Unexpected message handler failure: Processing error",
);
assert.equal(message.MessageId, "123");
assert.deepEqual((err as any).messageIds, ["123"]);
});

it("fires an `error` event when an `SQSError` occurs processing a message", async () => {
Expand Down Expand Up @@ -1707,6 +1708,67 @@ describe("Consumer", () => {
assert.ok(err);
assert.equal(err.message, "Error changing visibility timeout: failed");
});

it("includes messageIds in timeout errors", async () => {
const handleMessageTimeout = 500;
consumer = new Consumer({
queueUrl: QUEUE_URL,
region: REGION,
handleMessage: () =>
new Promise((resolve) => setTimeout(resolve, 1000)),
handleMessageTimeout,
sqs,
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT,
});

consumer.start();
const [err]: any = await Promise.all([
pEvent(consumer, "timeout_error"),
clock.tickAsync(handleMessageTimeout),
]);
consumer.stop();

assert.ok(err);
assert.equal(
err.message,
`Message handler timed out after ${handleMessageTimeout}ms: Operation timed out.`,
);
assert.deepEqual(err.messageIds, ["123"]);
});

it("includes messageIds in batch processing errors", async () => {
sqs.send.withArgs(mockReceiveMessage).resolves({
Messages: [
{ MessageId: "1", ReceiptHandle: "receipt-handle-1", Body: "body-1" },
{ MessageId: "2", ReceiptHandle: "receipt-handle-2", Body: "body-2" },
],
});

consumer = new Consumer({
queueUrl: QUEUE_URL,
region: REGION,
handleMessageBatch: () => {
throw new Error("Batch processing error");
},
batchSize: 2,
sqs,
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT,
});

consumer.start();
const [err]: any = await Promise.all([
pEvent(consumer, "error"),
clock.tickAsync(100),
]);
consumer.stop();

assert.ok(err);
assert.equal(
err.message,
"Unexpected message handler failure: Batch processing error"
);
assert.deepEqual(err.messageIds, ["1", "2"]);
});
});

describe("event listeners", () => {
Expand Down

0 comments on commit 13eceb2

Please sign in to comment.