Skip to content

Commit

Permalink
chore: add fifo queue warning
Browse files Browse the repository at this point in the history
  • Loading branch information
nicholasgriffintn committed Jan 29, 2025
1 parent 3ca59bf commit 2fc0039
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 0 deletions.
9 changes: 9 additions & 0 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ export class Consumer extends TypedEventEmitter {
private pollingTimeoutId: NodeJS.Timeout | undefined = undefined;
private stopped = true;
protected queueUrl: string;
private isFifoQueue: boolean;
private suppressFifoWarning: boolean;
private handleMessage: (message: Message) => Promise<Message | void>;
private handleMessageBatch: (message: Message[]) => Promise<Message[] | void>;
private preReceiveMessageCallback?: () => Promise<void>;
Expand Down Expand Up @@ -73,6 +75,8 @@ export class Consumer extends TypedEventEmitter {
super(options.queueUrl);
assertOptions(options);
this.queueUrl = options.queueUrl;
this.isFifoQueue = this.queueUrl.endsWith(".fifo");
this.suppressFifoWarning = options.suppressFifoWarning ?? false;
this.handleMessage = options.handleMessage;
this.handleMessageBatch = options.handleMessageBatch;
this.preReceiveMessageCallback = options.preReceiveMessageCallback;
Expand Down Expand Up @@ -115,6 +119,11 @@ export class Consumer extends TypedEventEmitter {
*/
public start(): void {
if (this.stopped) {
if (this.isFifoQueue && !this.suppressFifoWarning) {
logger.warn(
"WARNING: A FIFO queue was detected. SQS Consumer does not guarantee FIFO queues will work as expected. Set 'suppressFifoWarning: true' to disable this warning.",
);
}
// Create a new abort controller each time the consumer is started
this.abortController = new AbortController();
logger.debug("starting");
Expand Down
3 changes: 3 additions & 0 deletions src/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,7 @@ const debug = createDebug("sqs-consumer");

export const logger = {
debug,
warn: (message: string) => {
console.log(message);
},
};
5 changes: 5 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ export interface ConsumerOptions {
* that occurred from AWS, such as the response and metadata.
*/
extendedAWSErrors?: boolean;
/**
* Set this to `true` if you want to suppress the warning about FIFO queues.
* @defaultvalue `false`
*/
suppressFifoWarning?: boolean;
}

/**
Expand Down
71 changes: 71 additions & 0 deletions test/tests/consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1862,6 +1862,77 @@ describe("Consumer", () => {
});
});

describe("FIFO Queue Warning", () => {
let warnStub: sinon.SinonStub;

beforeEach(() => {
warnStub = sandbox.stub(logger, "warn");
});

it("emits a warning when starting with a FIFO queue URL", () => {
consumer = new Consumer({
queueUrl: "https://sqs.us-east-1.amazonaws.com/123456789012/queue.fifo",
region: REGION,
handleMessage,
sqs,
});

consumer.start();
consumer.stop();

sandbox.assert.calledOnce(warnStub);
sandbox.assert.calledWithMatch(
warnStub,
"WARNING: A FIFO queue was detected. SQS Consumer does not guarantee FIFO queues will work as expected. Set 'suppressFifoWarning: true' to disable this warning.",
);
});

it("does not emit warning for standard queue URLs", () => {
consumer = new Consumer({
queueUrl: QUEUE_URL,
region: REGION,
handleMessage,
sqs,
});

consumer.start();
consumer.stop();

sandbox.assert.notCalled(warnStub);
});

it("suppresses warning when suppressFifoWarning option is true", () => {
consumer = new Consumer({
queueUrl: "https://sqs.us-east-1.amazonaws.com/123456789012/queue.fifo",
region: REGION,
handleMessage,
sqs,
suppressFifoWarning: true,
});

consumer.start();
consumer.stop();

sandbox.assert.notCalled(warnStub);
});

it("emits warning on multiple start calls with FIFO queue", () => {
consumer = new Consumer({
queueUrl: "https://sqs.us-east-1.amazonaws.com/123456789012/queue.fifo",
region: REGION,
handleMessage,
sqs,
});

consumer.start();
consumer.stop();
consumer.start();
consumer.stop();

sandbox.assert.calledTwice(warnStub);
});
});

describe("event listeners", () => {
it("fires the event multiple times", async () => {
sqs.send.withArgs(mockReceiveMessage).resolves({});
Expand Down

6 comments on commit 2fc0039

@pveller
Copy link

@pveller pveller commented on 2fc0039 Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nicholasgriffintn , can you please explain why SQS Consumer does not guarantee FIFO queues will work as expected ? Where (and why) do the FIFO guarantees break?

@nicholasgriffintn
Copy link
Member Author

@nicholasgriffintn nicholasgriffintn commented on 2fc0039 Feb 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This hasn't been released outside of canary just yet but basically, SQS Consumer is configured to work in a different way to FIFO by default, this means that unless you've configured it in a particular way, you may not get the same FIFO experience.

I haven't thought about what it will take to support FIFO queues, we also don't use them internally, so currently, this message is just there to make it clear that we don't have that guaranteed support, I believe they will still work, however, the experience may not be what you expect depending on your configuration.

@pveller
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm... @nicholasgriffintn , do you mind explaining a bit more? We do use FIFO and we use sqs-consumer where we can't leverage SQS -> Lambda integration. I don't see where sqs-consumer's approach to reading messages breaks FIFO guarantees. You poll. Then you process. Then you delete (acknowledge) on success,. Then you poll again. if you don't delete, you emit an error. FIFO queue will keep the messages for the same MessageGroupId unril after the messages it has released have been acknowledged as processed. Where does the guarantee break? I must be missing something... Thanks

@nicholasgriffintn
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well basically because we process in parallel by default rather than in order, which potentially breaks the guarantee of first in, first out if you don't have the correct config.

This is more of a case that makes it clear that we are not actively developing SQS Consumer to work with FIFO (as we don't use it), rather than a specific declaration as to if it does or doesn't work, I actually don't know if it does or doesn't as I don't use FIFO for any implementations.

@pveller
Copy link

@pveller pveller commented on 2fc0039 Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nicholasgriffintn , I think I know what you mean. If I supply handleMessage(), you run those over Promise.all() vs for of. If I supply handleMessageBatch() instead, you hand me the batch (that is ordered) and it's on me to run it in order to keep the FIFO guarantees.

I would recommend a few things if I may:

  • spell out in the README where you talk about FIFO that FIFO requires handleMessageBatch()
  • only log the WARN when you see isFifoQueue + handleMessage (vs. handleMessageBatch)
  • maybe use for of instead of Promise.all() when it's isFifoQueue? and then no warning needed? 😄

Thank you!

p.s. we use handleMessageBatch() with our FIFO queues

@nicholasgriffintn
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, thanks for the suggestions, this is actually why I haven't put it further than canary yet.

Using a non parallel loop is a consideration but I'm not sure if we'd do it or not, it's a weigh up between what we need ourselves / the capacity to support, it's difficult to choose to add code that we won't be using ourselves.

Please sign in to comment.