From 10e445c3566d9c3127466bd7fa27c6cfbdb04365 Mon Sep 17 00:00:00 2001 From: Harold Hunt Date: Wed, 26 Feb 2025 22:05:50 -0500 Subject: [PATCH] Allow multiple in-order queued items --- README.md | 27 +++++- examples/queue-depth-control.ts | 107 +++++++++++++++++++++++ package.json | 1 + src/index.ts | 29 +++++- src/iterable-queue-mapper-simple.test.ts | 93 ++++++++++++++++++++ src/iterable-queue-mapper-simple.ts | 44 ++++++++-- 6 files changed, 293 insertions(+), 8 deletions(-) create mode 100644 examples/queue-depth-control.ts diff --git a/README.md b/README.md index 44e388b..1b6fa27 100644 --- a/README.md +++ b/README.md @@ -200,9 +200,32 @@ Run the example with `npm run example:iterable-queue-mapper` `IterableQueueMapperSimple` is similar to `IterableQueueMapper` but instead exposing the results as an iterable it discards the results as soon as they are ready and exposes any errors through the `errors` property. -See [examples/iterable-queue-mapper-simple.ts](./examples/iterable-queue-mapper-simple.ts) for an example. +## Queue Depth Control -Run the example with `npm run example:iterable-queue-mapper-simple` +The `IterableQueueMapperSimple` supports a `maxQueueDepth` option that controls how many items can be queued before blocking, independently from the `concurrency` setting. + +This is particularly useful for FIFO sequential processing scenarios where you want to: +1. Process items one at a time in order (concurrency: 1) +2. Allow multiple items to be queued up (maxQueueDepth: N) + +Example configuration for ordered database writes with a queue of 8 items: + +```typescript +const dbFlusher = new IterableQueueMapperSimple(writeToDatabase, { + concurrency: 1, // Process one at a time (sequential FIFO order) + maxQueueDepth: 8 // Allow up to 8 items to be queued +}); +``` + +This allows your application to queue up to 8 items for sequential processing, which can significantly improve throughput in scenarios where item generation is bursty but processing must be sequential. + +See [examples/iterable-queue-mapper-simple.ts](./examples/iterable-queue-mapper-simple.ts) for basic usage and [examples/queue-depth-control.ts](./examples/queue-depth-control.ts) for queue depth control examples. + +Run the examples with: +``` +npm run example:iterable-queue-mapper-simple +npm run example:queue-depth-control +``` # Contributing - Setting up Build Environment diff --git a/examples/queue-depth-control.ts b/examples/queue-depth-control.ts new file mode 100644 index 0000000..5ab442d --- /dev/null +++ b/examples/queue-depth-control.ts @@ -0,0 +1,107 @@ +/** + * Example demonstrating the queue depth control feature with SimpleBackgroundFlusher + */ +import { SimpleBackgroundFlusher } from '../src'; + +async function delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +/** + * Example showing how to use SimpleBackgroundFlusher with different queue configurations + */ +async function main() { + // First example - default behavior (maxQueueDepth = concurrency) + await defaultBehaviorExample(); + + // Second example - FIFO sequential processing with separate queue depth + await sequentialFifoExample(); +} + +/** + * Default behavior (maxQueueDepth equals concurrency) + */ +async function defaultBehaviorExample() { + console.log('\n--- Default Behavior Example (maxQueueDepth = concurrency) ---'); + + // Simulate a destination write operation + const writeDestination = async (item: { id: number }) => { + console.log(`Processing item ${item.id}...`); + await delay(300); // Simulate I/O operation + }; + + // Create a simple background flusher with concurrency 2 + // By default, maxQueueDepth = concurrency (2) + const flusher = new SimpleBackgroundFlusher(writeDestination, { + concurrency: 2, // Process 2 items at a time, queue limited to 2 items + }); + + console.log('Adding items - only 2 can be queued before blocking (default behavior)'); + + const startTime = Date.now(); + for (let i = 1; i <= 6; i++) { + const item = { id: i }; + const enqueueStart = Date.now(); + console.log(`Trying to enqueue item ${i} at ${enqueueStart - startTime}ms`); + + await flusher.enqueue(item); + + const enqueueEnd = Date.now(); + console.log(`Item ${i} enqueued after ${enqueueEnd - enqueueStart}ms`); + + // No delay between enqueues to demonstrate blocking behavior + } + + console.log('Waiting for all processing to complete...'); + await flusher.onIdle(); + console.log('All items processed successfully'); +} + +/** + * Example showing how to use SimpleBackgroundFlusher with FIFO sequential processing + * and a separate queue depth control + */ +async function sequentialFifoExample() { + console.log('\n--- Sequential FIFO Processing Example ---'); + + // Simulate a database write operation that must be in order + const writeToDatabase = async (item: { id: number }) => { + console.log(`DB Writing item ${item.id}...`); + await delay(300); // Simulate DB write operation + }; + + // Create a simple background flusher with: + // - concurrency: 1 (process one item at a time) + // - maxQueueDepth: 5 (allow up to 5 items to be queued before blocking) + const dbFlusher = new SimpleBackgroundFlusher(writeToDatabase, { + concurrency: 1, // Process 1 write at a time (sequential FIFO processing) + maxQueueDepth: 5, // Allow up to 5 items to be queued before blocking + }); + + console.log('Adding items rapidly - they will queue up to maxQueueDepth limit (5)'); + + // This will queue up items quickly, but they will be processed one at a time + const startTime = Date.now(); + for (let i = 1; i <= 8; i++) { + const enqueueStart = Date.now(); + console.log(`Trying to enqueue DB item ${i} at ${enqueueStart - startTime}ms`); + + await dbFlusher.enqueue({ id: i }); + + const enqueueEnd = Date.now(); + const elapsed = enqueueEnd - enqueueStart; + + console.log(`Item ${i} enqueued after ${elapsed}ms`); + + // Items 1-5 should enqueue immediately, item 6 will block until item 1 is processed + } + + // Wait for all background operations to complete + console.log('Waiting for all DB writes to complete...'); + await dbFlusher.onIdle(); + console.log('All DB writes completed'); + console.log('Sequential FIFO Processing complete'); +} + +// Run the examples +main().catch(console.error); diff --git a/package.json b/package.json index 83b8074..959cfd4 100644 --- a/package.json +++ b/package.json @@ -29,6 +29,7 @@ "example:iterable-mapper": "ts-node -r tsconfig-paths/register examples/iterable-mapper.ts", "example:iterable-queue-mapper": "ts-node -r tsconfig-paths/register examples/iterable-queue-mapper.ts", "example:iterable-queue-mapper-simple": "ts-node -r tsconfig-paths/register examples/iterable-queue-mapper-simple.ts", + "example:queue-depth-control": "ts-node -r tsconfig-paths/register examples/queue-depth-control.ts", "test": "jest", "lint": "eslint ./ --ext .ts --ext .tsx", "lint-and-fix": "eslint ./ --ext .ts --ext .tsx --fix" diff --git a/src/index.ts b/src/index.ts index bc318c6..f383be0 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,9 +2,35 @@ import { Mapper, IterableMapper, IterableMapperOptions } from './iterable-mapper import { BlockingQueue, BlockingQueueOptions } from './blocking-queue'; import { IterableQueue, IterableQueueOptions } from './iterable-queue'; import { IterableQueueMapper, IterableQueueMapperOptions } from './iterable-queue-mapper'; -import { IterableQueueMapperSimple } from './iterable-queue-mapper-simple'; +import { + IterableQueueMapperSimple, + IterableQueueMapperSimpleOptions, +} from './iterable-queue-mapper-simple'; import { Queue } from './queue'; +// Create class aliases with more descriptive names +/** + * Prefetcher - Processes items from an iterable source in the background before they're needed. + * This is an alias for IterableMapper. + */ +export const Prefetcher = IterableMapper; +export type PrefetcherOptions = IterableMapperOptions; + +/** + * BackgroundFlusher - Processes items in the background with results accessible via iteration. + * This is an alias for IterableQueueMapper. + */ +export const BackgroundFlusher = IterableQueueMapper; +export type BackgroundFlusherOptions = IterableQueueMapperOptions; + +/** + * SimpleBackgroundFlusher - Processes items in the background, automatically discarding results. + * This is an alias for IterableQueueMapperSimple. + */ +export const SimpleBackgroundFlusher = IterableQueueMapperSimple; +export type SimpleBackgroundFlusherOptions = IterableQueueMapperSimpleOptions; + +// Export all types and classes export { Mapper, IterableMapper, @@ -12,6 +38,7 @@ export { IterableQueueMapper, IterableQueueMapperOptions, IterableQueueMapperSimple, + IterableQueueMapperSimpleOptions, BlockingQueue, BlockingQueueOptions, IterableQueue, diff --git a/src/iterable-queue-mapper-simple.test.ts b/src/iterable-queue-mapper-simple.test.ts index afc2af5..c82f1c7 100644 --- a/src/iterable-queue-mapper-simple.test.ts +++ b/src/iterable-queue-mapper-simple.test.ts @@ -124,4 +124,97 @@ describe('IterableQueueMapperSimple', () => { expect(backgroundWriter.errors.length).toBe(0); }); + + describe('maxQueueDepth option', () => { + // Test default behavior without using mocks + it('defaults maxQueueDepth to equal concurrency', async () => { + const mapper = jest.fn(async (): Promise => { + await sleep(10); + }); + + // With default configuration (no explicit maxQueueDepth) + const backgroundWriter = new IterableQueueMapperSimple(mapper, { + concurrency: 3, + }); + + // Queue 3 items (should accept without delay - concurrency 3, default maxQueueDepth 3) + await backgroundWriter.enqueue(1); + await backgroundWriter.enqueue(2); + await backgroundWriter.enqueue(3); + + // 4th item should only be accepted after one completes + const startTime = Date.now(); + await backgroundWriter.enqueue(4); + const elapsed = Date.now() - startTime; + + // Queue should have been full after first 3 items + expect(elapsed).toBeGreaterThan(5); + + await backgroundWriter.onIdle(); + }); + + it('allows configuring independent queue depth with maxQueueDepth', async () => { + // Using a longer sleep to make the test more reliable + const sleepTime = 50; + const mapper = jest.fn(async (): Promise => { + await sleep(sleepTime); + }); + + // Set concurrency to 1 but maxQueueDepth to 3 + // This means: + // - Only 1 item processed at a time + // - Up to 3 items can be queued before blocking + const backgroundWriter = new IterableQueueMapperSimple(mapper, { + concurrency: 1, + maxQueueDepth: 3, + }); + + // Queue 3 items (should accept without delay with maxQueueDepth 3) + await backgroundWriter.enqueue(1); + await backgroundWriter.enqueue(2); + await backgroundWriter.enqueue(3); + + // 4th item should have to wait + const startTime = Date.now(); + await backgroundWriter.enqueue(4); + const elapsed = Date.now() - startTime; + + // Verify that enqueuing the 4th item had to wait + expect(elapsed).toBeGreaterThanOrEqual(sleepTime - 10); // Allow some margin + + await backgroundWriter.onIdle(); + + // Verify all items were processed + expect(mapper).toHaveBeenCalledTimes(4); + }); + + it('processes items in FIFO order with concurrency 1', async () => { + const processedItems: number[] = []; + + // Create a mapper that records the order of processing + const mapper = jest.fn(async (item: number): Promise => { + await sleep(10); + processedItems.push(item); + }); + + const backgroundWriter = new IterableQueueMapperSimple(mapper, { + concurrency: 1, // Process 1 at a time (sequential FIFO) + maxQueueDepth: 5, // Allow up to 5 in queue + }); + + // Queue all items + await backgroundWriter.enqueue(1); + await backgroundWriter.enqueue(2); + await backgroundWriter.enqueue(3); + await backgroundWriter.enqueue(4); + await backgroundWriter.enqueue(5); + await backgroundWriter.enqueue(6); + + // Wait for all to complete + await backgroundWriter.onIdle(); + + // Verify FIFO order was maintained + expect(processedItems).toEqual([1, 2, 3, 4, 5, 6]); + }); + }); }); diff --git a/src/iterable-queue-mapper-simple.ts b/src/iterable-queue-mapper-simple.ts index 028cd04..d992858 100644 --- a/src/iterable-queue-mapper-simple.ts +++ b/src/iterable-queue-mapper-simple.ts @@ -1,4 +1,3 @@ -import { IterableMapperOptions, Mapper } from './iterable-mapper'; import { IterableQueueMapper } from './iterable-queue-mapper'; // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -9,7 +8,31 @@ const NoResult = Symbol('noresult'); /** * Options for IterableQueueMapperSimple */ -export type IterableQueueMapperSimpleOptions = Pick; +export interface IterableQueueMapperSimpleOptions { + /** + * Maximum number of concurrent invocations of `mapper` to run at once. + * + * Must be an integer from 1 and up or `Infinity`. + * + * @default 4 + */ + readonly concurrency?: number; + + /** + * Maximum number of items that can be queued before blocking. + * This allows control over how many items can be added to the queue + * while waiting for processing, independently of concurrency. + * + * For example, with concurrency=1 and maxQueueDepth=8, it will: + * - Process one item at a time in FIFO order + * - Allow up to 8 items to be queued before blocking on enqueue() + * + * Must be an integer from 1 and up or `Infinity`, and should be >= `concurrency`. + * + * @default Same as concurrency + */ + readonly maxQueueDepth?: number; +} /** * Accepts queue items via `enqueue` and calls the `mapper` on them @@ -25,6 +48,11 @@ export type IterableQueueMapperSimpleOptions = Pick { * @see {@link IterableMapper} for underlying mapper implementation and examples of combined usage */ constructor(mapper: Mapper, options: IterableQueueMapperSimpleOptions = {}) { - const { concurrency = 4 } = options; + const { concurrency = 4, maxQueueDepth } = options; + + // If maxQueueDepth is not specified, default to concurrency (maintaining backward compatibility) + const effectiveMaxUnread = maxQueueDepth !== undefined ? maxQueueDepth : concurrency; this._mapper = mapper; this.worker = this.worker.bind(this); - this._writer = new IterableQueueMapper(this.worker, { concurrency, maxUnread: concurrency }); + this._writer = new IterableQueueMapper(this.worker, { + concurrency, + maxUnread: effectiveMaxUnread, + }); // Discard all of the results this._done = this.discardResults();