AMQP based reliable event handling in a services based architecture. The library aims to:
- Provide a simple, opinionated, type-safe mechanism for event handling
- Ensure reliable event delivery, automatically retrying failed events
- Graceful handling of server shutdowns -- ensuring events being processed are waited for completion
- Upon an initial failure of event consumption, the event is retried with an constant backoff strategy
- Batch publish events to improve processing throughput + keep retrying failed publishes
- Job Queues: You can use this library to create a job queue, where you publish jobs to a queue, and have multiple services consume the jobs with limited concurrency.
- Eg. You can use this to create a system where you publish jobs like
resize-image-job,send-email-job,generate-pdf-job, etc. and have multiple services consume these jobs to perform the actual work.
- Eg. You can use this to create a system where you publish jobs like
- Event Sourcing: You can use this library to create an event sourcing system, where you publish events to a queue, and have multiple services consume the events.
- Eg. You can use this to create a system where you publish events like
user-created,user-updated,user-deleted, etc. and have multiple services consume these events to update their own databases, or perform some other action.
- Eg. You can use this to create a system where you publish events like
npm install git+https://github.com/chatdaddy/event-bridge.git- Clone the repository
- Start the RabbitMQ server. We've a docker-compose file for that. Run
docker-compose up -d - Run the tests using
npm run test
First, you'd want to define the types of events you're going to handle. This is done using a type alias, where the key is the event name and the value is the type of data associated with the event.
type EventMap = {
"resize-image-job": {
imageUrl: string;
};
};import { makeAmqpEventBridge } from "@chatdaddy/event-bridge";
const bridge = makeAmqpEventBridge<EventMap>({
batcherConfig: {
// automatically push events every 350ms
// automatic publish after an interval is disabled
// by default, or by setting to 0
eventsPushIntervalMs: 350,
// auto publish when the number of events reaches 500
maxEventsForFlush: 500,
// max retries for a failed publish
maxRetries: 3,
},
publishOptions: {
// timeout for the publish operation
timeout: 60_000,
},
});
// publish an event
bridge.publish(
"resize-image-job",
// owner of the event -- who owns the event & its data. Could be a user
// ID, orginization ID, etc.
// this is optional though -- you can leave it as a blank string too
"user_1234",
{ imageUrl: "https://example.com/image.jpg" }
);
// flush events immediately
await bridge.flush();The library batches events by the owner of the event, so if an owner produces multiple events in quick succession, they are batched together and published in a single publish operation.
If you'd really like to publish a single event in a single publish operation, you can do so by setting the batcherConfig.maxEventsForFlush to 1.
import { makeAmqpEventBridge } from "@chatdaddy/event-bridge";
const bridge = makeAmqpEventBridge<EventMap>({
subscriptions: [
{
queueName: "my-micro-service",
// handle the event -- in case the async fn
// rejects, it's counted as a failure
async onEvent({ event, data, msgId }) {
if (event === "resize-image-job") {
console.log(
`Resising ${data.length} images`,
"with message id",
msgId
);
}
},
events: ["resize-image-job"],
// number of messages to process concurrently
// on a single worker
maxMessagesPerWorker: 10,
queueConfig: {
// will retry the event at most 2 times
// immediately after the initial failure
maxInitialRetries: 2,
// max number of times the event will be retried
// with the specified delay. After this, the event
// will be discarded. Set to 0 to disable delayed retries.
maxDelayedRetries: 3,
// retry after 1hr
delayedRetrySeconds: 60 * 60,
},
// will collect events for 250ms before combining them
// into a single data point & calls the onEvent handler
// once for the combined data. The messageId for this combined
// message will be a space separated list of the individual
// messageIds
batchConsumeIntervalMs: 250
},
],
batcherConfig: {
// automatically push events every 350ms
// or when the number of events reaches 500
eventsPushIntervalMs: 350,
maxEventsForFlush: 500,
// max retries for a failed publish
maxRetries: 3,
},
publishOptions: {
timeout: 60_000,
},
});As you can see, since subscriptions is an array, you can have multiple subscriptions, each with their own queue name, event handlers, and queue configurations.
Sometimes you might want to publish an event directly to a queue, without any batching or to another exchange. You can do so using the sendDirect method.
// msg ID is returned for tracking the event -- if you want to
const { msgId } = await bridge.sendDirect({
event: "resize-image-job",
data: [{ imageUrl: "https://example.com/image.jpg" }],
ownerId: "some-owner-id",
queueName: "my-micro-service",
});// gracefully shutdown the event bridge
// will process all currently processing events, and stop
// accepting new events
await bridge.close();We utilise a dead-letter exchange to handle delayed retries.
Supposing a queue fun with a x-delivery-limit of N (which means the message will be retried at most N times before being discarded). It has an argument x-dead-letter-exchange argument that points to the exchange where failed messages are routed after N retries
fun_dlx: Dead-letter exchange for thefunqueue, where failed messages are routed afterNretries. It's exclusive to thefunqueuefun_dlx_queue: Queue bound tofun_dlxwith ax-message-ttlofTmilliseconds. Has an argumentx-dead-letter-exchangepoints to exchange where failed messages are sentx-message-ttl: Time-to-live (Tmilliseconds) for messages infun_dlx_queuex-dead-letter-exchange: Argument that points to exchange where failed messages are sentfun_dlx_back_to_queue: Exchange where the message will be sent fromfun_dlx_queuein caseTmilliseconds have passedM: Maximum number of delayed retries for a message (maxDelayedRetries)
- The
funqueue has a retry limit (x-delivery-limit) ofNand is configured with a dead-letter exchange (fun_dlx) - When a message in
funfails to processNtimes, it is routed to thefun_dlxexchange - The
fun_dlx_queueis bound tofun_dlxand holds failed messages forTmilliseconds (x-message-ttl) before being discarded or pushed to another exchange if ax-dead-letter-exchangeargument is provided - In case
Tmilliseconds have passed, messages are pushed fromfun_dlx_queueto thefun_dlx_back_to_queueexchange (this is specified byx-dead-letter-exchangeargument offun_dlx_queue), which in turn requeues it to thefunqueue - If a message fails again after being requeued, it can be requeued up to M times (
maxDelayedRetries)