Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adding the ability to execute the consumer concurrently #449

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/**
* The current polling status of the consumer.
*/
export enum POLLING_STATUS {
ACTIVE,
WAITING,
INACTIVE,
READY,
}
60 changes: 55 additions & 5 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
} from "@aws-sdk/client-sqs";

import { ConsumerOptions, StopOptions, UpdatableOptions } from "./types.js";
import { POLLING_STATUS } from "./constants.js";
import { TypedEventEmitter } from "./emitter.js";
import { autoBind } from "./bind.js";
import {
Expand Down Expand Up @@ -49,6 +50,9 @@ export class Consumer extends TypedEventEmitter {
private shouldDeleteMessages: boolean;
private alwaysAcknowledge: boolean;
private batchSize: number;
private concurrency: number;
private concurrentExecutions: number;
private pollingStatus: POLLING_STATUS;
private visibilityTimeout: number;
private terminateVisibilityTimeout: boolean;
private waitTimeSeconds: number;
Expand All @@ -72,6 +76,7 @@ export class Consumer extends TypedEventEmitter {
this.attributeNames = options.attributeNames || [];
this.messageAttributeNames = options.messageAttributeNames || [];
this.batchSize = options.batchSize || 1;
this.concurrency = options.concurrency ?? 1;
this.visibilityTimeout = options.visibilityTimeout;
this.terminateVisibilityTimeout =
options.terminateVisibilityTimeout || false;
Expand Down Expand Up @@ -176,15 +181,20 @@ export class Consumer extends TypedEventEmitter {

/**
* Returns the current status of the consumer.
* This includes whether it is running or currently polling.
* This includes whether it is running or currently polling as well as the current
* number of concurrent executions and polling status.
*/
public get status(): {
isRunning: boolean;
isPolling: boolean;
pollingStatus: POLLING_STATUS;
concurrentExecutions: number;
} {
return {
isRunning: !this.stopped,
isPolling: this.isPolling,
pollingStatus: this.pollingStatus,
concurrentExecutions: this.concurrentExecutions,
};
}

Expand Down Expand Up @@ -221,11 +231,27 @@ export class Consumer extends TypedEventEmitter {
}
}

/**
* Queue a poll to be executed after a timeout
* @param timeout The timeout to wait before polling
* @returns The timeout id
*/
private queuePoll(timeout?: number) {
if (this.pollingStatus !== POLLING_STATUS.WAITING) {
this.pollingStatus = POLLING_STATUS.WAITING;
if (this.pollingTimeoutId) {
clearTimeout(this.pollingTimeoutId);
}
this.pollingTimeoutId = setTimeout(this.poll, timeout);
}
}

/**
* Poll for new messages from SQS
*/
private poll(): void {
if (this.stopped) {
this.pollingStatus = POLLING_STATUS.INACTIVE;
logger.debug("cancelling_poll", {
detail:
"Poll was called while consumer was stopped, cancelling poll...",
Expand All @@ -238,6 +264,22 @@ export class Consumer extends TypedEventEmitter {
this.isPolling = true;

let currentPollingTimeout = this.pollingWaitTimeMs;

const isConcurrencyReached = this.concurrentExecutions >= this.concurrency;

if (isConcurrencyReached) {
logger.debug("reached_concurrency_limit", {
detail:
"The concurrency limit has been reached. Pausing before retrying.",
});
this.pollingStatus = POLLING_STATUS.READY;
this.queuePoll(currentPollingTimeout);
return;
}

logger.debug("polling");
this.pollingStatus = POLLING_STATUS.ACTIVE;

this.receiveMessage({
QueueUrl: this.queueUrl,
AttributeNames: this.attributeNames,
Expand All @@ -246,6 +288,10 @@ export class Consumer extends TypedEventEmitter {
WaitTimeSeconds: this.waitTimeSeconds,
VisibilityTimeout: this.visibilityTimeout,
})
.then((response) => {
this.queuePoll(currentPollingTimeout);
return response;
})
.then(this.handleSqsResponse)
.catch((err) => {
this.emitError(err);
Expand All @@ -259,15 +305,15 @@ export class Consumer extends TypedEventEmitter {
return;
})
.then(() => {
if (this.pollingTimeoutId) {
clearTimeout(this.pollingTimeoutId);
}
this.pollingTimeoutId = setTimeout(this.poll, currentPollingTimeout);
this.queuePoll(currentPollingTimeout);
})
.catch((err) => {
this.emitError(err);
})
.finally(() => {
if (this.pollingStatus === POLLING_STATUS.ACTIVE) {
this.pollingStatus = POLLING_STATUS.INACTIVE;
}
this.isPolling = false;
});
}
Expand Down Expand Up @@ -306,12 +352,16 @@ export class Consumer extends TypedEventEmitter {
response: ReceiveMessageCommandOutput,
): Promise<void> {
if (hasMessages(response)) {
this.concurrentExecutions += 1;

if (this.handleMessageBatch) {
await this.processMessageBatch(response.Messages);
} else {
await Promise.all(response.Messages.map(this.processMessage));
}

this.concurrentExecutions -= 1;

this.emit("response_processed");
} else if (response) {
this.emit("empty");
Expand Down
5 changes: 5 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ export interface ConsumerOptions {
* @defaultvalue `1`
*/
batchSize?: number;
/**
* The number of messages (or batches if `handleMessageBatch` is set) to
* process concurrently.
*/
concurrency?: number;
/**
* The duration (in seconds) that the received messages are hidden from subsequent
* retrieve requests after being retrieved by a ReceiveMessage request.
Expand Down
8 changes: 8 additions & 0 deletions src/validation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ function validateOption(
throw new Error("batchSize must be between 1 and 10.");
}
break;
case "concurrency":
if (value < 1) {
throw new Error("concurrency must be greater than 0.");
}
break;
case "heartbeatInterval":
if (
!allOptions.visibilityTimeout ||
Expand Down Expand Up @@ -75,6 +80,9 @@ function assertOptions(options: ConsumerOptions): void {
if (options.batchSize) {
validateOption("batchSize", options.batchSize, options);
}
if (options.concurrency) {
validateOption("concurrency", options.concurrency, options);
}
if (options.heartbeatInterval) {
validateOption("heartbeatInterval", options.heartbeatInterval, options);
}
Expand Down
66 changes: 66 additions & 0 deletions test/tests/consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,17 @@ describe("Consumer", () => {
}, "batchSize must be between 1 and 10.");
});

it("requires the concurrency option to be greater than 0", () => {
assert.throws(() => {
new Consumer({
region: REGION,
queueUrl: QUEUE_URL,
handleMessage,
concurrency: 0,
});
}, "concurrency must be greater than 0.");
});

it("requires visibilityTimeout to be set with heartbeatInterval", () => {
assert.throws(() => {
new Consumer({
Expand All @@ -139,6 +150,17 @@ describe("Consumer", () => {
}, "heartbeatInterval must be less than visibilityTimeout.");
});

it("requires heartbeatInterval to be less than visibilityTimeout", () => {
assert.throws(() => {
new Consumer({
region: REGION,
queueUrl: QUEUE_URL,
handleMessage,
heartbeatInterval: 30,
});
}, "heartbeatInterval must be less than visibilityTimeout.");
});

it("requires heartbeatInterval to be less than visibilityTimeout", () => {
assert.throws(() => {
new Consumer({
Expand Down Expand Up @@ -1649,6 +1671,26 @@ describe("Consumer", () => {
});
});

describe("getStatus", async () => {
it("returns the status of the consumer when it is stopped", () => {
assert.equal(consumer.getStatus(), {
isRunning: false,
pollingStatus: "stopped",
concurrentExecutions: 0,
});
});

it("returns the status of the consumer when it is running", () => {
consumer.start();
assert.equal(consumer.getStatus(), {
isRunning: true,
pollingStatus: "running",
concurrentExecutions: 1,
});
consumer.stop();
});
});

describe("updateOption", async () => {
it("updates the visibilityTimeout option and emits an event", () => {
const optionUpdatedListener = sandbox.stub();
Expand Down Expand Up @@ -1723,6 +1765,30 @@ describe("Consumer", () => {
sandbox.assert.notCalled(optionUpdatedListener);
});

it("updates the concurrency option and emits an event", () => {
const optionUpdatedListener = sandbox.stub();
consumer.on("option_updated", optionUpdatedListener);

consumer.updateOption("concurrency", 4);

assert.equal(consumer.concurrency, 4);

sandbox.assert.calledWithMatch(optionUpdatedListener, "concurrency", 4);
});

it("does not update the concurrency if the value is less than 1", () => {
const optionUpdatedListener = sandbox.stub();
consumer.on("option_updated", optionUpdatedListener);

assert.throws(() => {
consumer.updateOption("concurrency", 0);
}, "concurrency must be greater than 0.");

assert.equal(consumer.concurrency, 1);

sandbox.assert.notCalled(optionUpdatedListener);
});

it("updates the waitTimeSeconds option and emits an event", () => {
const optionUpdatedListener = sandbox.stub();
consumer.on("option_updated", optionUpdatedListener);
Expand Down
Loading