Skip to content

Allow multiple in-order queued items #17

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,32 @@ Run the example with `npm run example:iterable-queue-mapper`

`IterableQueueMapperSimple` is similar to `IterableQueueMapper` but instead exposing the results as an iterable it discards the results as soon as they are ready and exposes any errors through the `errors` property.

See [examples/iterable-queue-mapper-simple.ts](./examples/iterable-queue-mapper-simple.ts) for an example.
## Queue Depth Control

Run the example with `npm run example:iterable-queue-mapper-simple`
The `IterableQueueMapperSimple` supports a `maxQueueDepth` option that controls how many items can be queued before blocking, independently from the `concurrency` setting.

This is particularly useful for FIFO sequential processing scenarios where you want to:
1. Process items one at a time in order (concurrency: 1)
2. Allow multiple items to be queued up (maxQueueDepth: N)

Example configuration for ordered database writes with a queue of 8 items:

```typescript
const dbFlusher = new IterableQueueMapperSimple(writeToDatabase, {
concurrency: 1, // Process one at a time (sequential FIFO order)
maxQueueDepth: 8 // Allow up to 8 items to be queued
});
```

This allows your application to queue up to 8 items for sequential processing, which can significantly improve throughput in scenarios where item generation is bursty but processing must be sequential.

See [examples/iterable-queue-mapper-simple.ts](./examples/iterable-queue-mapper-simple.ts) for basic usage and [examples/queue-depth-control.ts](./examples/queue-depth-control.ts) for queue depth control examples.

Run the examples with:
```
npm run example:iterable-queue-mapper-simple
npm run example:queue-depth-control
```

# Contributing - Setting up Build Environment

Expand Down
107 changes: 107 additions & 0 deletions examples/queue-depth-control.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/**
* Example demonstrating the queue depth control feature with SimpleBackgroundFlusher
*/
import { SimpleBackgroundFlusher } from '../src';

async function delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

/**
* Example showing how to use SimpleBackgroundFlusher with different queue configurations
*/
async function main() {
// First example - default behavior (maxQueueDepth = concurrency)
await defaultBehaviorExample();

// Second example - FIFO sequential processing with separate queue depth
await sequentialFifoExample();
}

/**
* Default behavior (maxQueueDepth equals concurrency)
*/
async function defaultBehaviorExample() {
console.log('\n--- Default Behavior Example (maxQueueDepth = concurrency) ---');

// Simulate a destination write operation
const writeDestination = async (item: { id: number }) => {
console.log(`Processing item ${item.id}...`);
await delay(300); // Simulate I/O operation
};

// Create a simple background flusher with concurrency 2
// By default, maxQueueDepth = concurrency (2)
const flusher = new SimpleBackgroundFlusher(writeDestination, {
concurrency: 2, // Process 2 items at a time, queue limited to 2 items
});

console.log('Adding items - only 2 can be queued before blocking (default behavior)');

const startTime = Date.now();
for (let i = 1; i <= 6; i++) {
const item = { id: i };
const enqueueStart = Date.now();
console.log(`Trying to enqueue item ${i} at ${enqueueStart - startTime}ms`);

await flusher.enqueue(item);

const enqueueEnd = Date.now();
console.log(`Item ${i} enqueued after ${enqueueEnd - enqueueStart}ms`);

// No delay between enqueues to demonstrate blocking behavior
}

console.log('Waiting for all processing to complete...');
await flusher.onIdle();
console.log('All items processed successfully');
}

/**
* Example showing how to use SimpleBackgroundFlusher with FIFO sequential processing
* and a separate queue depth control
*/
async function sequentialFifoExample() {
console.log('\n--- Sequential FIFO Processing Example ---');

// Simulate a database write operation that must be in order
const writeToDatabase = async (item: { id: number }) => {
console.log(`DB Writing item ${item.id}...`);
await delay(300); // Simulate DB write operation
};

// Create a simple background flusher with:
// - concurrency: 1 (process one item at a time)
// - maxQueueDepth: 5 (allow up to 5 items to be queued before blocking)
const dbFlusher = new SimpleBackgroundFlusher(writeToDatabase, {
concurrency: 1, // Process 1 write at a time (sequential FIFO processing)
maxQueueDepth: 5, // Allow up to 5 items to be queued before blocking
});

console.log('Adding items rapidly - they will queue up to maxQueueDepth limit (5)');

// This will queue up items quickly, but they will be processed one at a time
const startTime = Date.now();
for (let i = 1; i <= 8; i++) {
const enqueueStart = Date.now();
console.log(`Trying to enqueue DB item ${i} at ${enqueueStart - startTime}ms`);

await dbFlusher.enqueue({ id: i });

const enqueueEnd = Date.now();
const elapsed = enqueueEnd - enqueueStart;

console.log(`Item ${i} enqueued after ${elapsed}ms`);

// Items 1-5 should enqueue immediately, item 6 will block until item 1 is processed
}

// Wait for all background operations to complete
console.log('Waiting for all DB writes to complete...');
await dbFlusher.onIdle();
console.log('All DB writes completed');
console.log('Sequential FIFO Processing complete');
}

// Run the examples
main().catch(console.error);
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"example:iterable-mapper": "ts-node -r tsconfig-paths/register examples/iterable-mapper.ts",
"example:iterable-queue-mapper": "ts-node -r tsconfig-paths/register examples/iterable-queue-mapper.ts",
"example:iterable-queue-mapper-simple": "ts-node -r tsconfig-paths/register examples/iterable-queue-mapper-simple.ts",
"example:queue-depth-control": "ts-node -r tsconfig-paths/register examples/queue-depth-control.ts",
"test": "jest",
"lint": "eslint ./ --ext .ts --ext .tsx",
"lint-and-fix": "eslint ./ --ext .ts --ext .tsx --fix"
Expand Down
29 changes: 28 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,43 @@ import { Mapper, IterableMapper, IterableMapperOptions } from './iterable-mapper
import { BlockingQueue, BlockingQueueOptions } from './blocking-queue';
import { IterableQueue, IterableQueueOptions } from './iterable-queue';
import { IterableQueueMapper, IterableQueueMapperOptions } from './iterable-queue-mapper';
import { IterableQueueMapperSimple } from './iterable-queue-mapper-simple';
import {
IterableQueueMapperSimple,
IterableQueueMapperSimpleOptions,
} from './iterable-queue-mapper-simple';
import { Queue } from './queue';

// Create class aliases with more descriptive names
/**
* Prefetcher - Processes items from an iterable source in the background before they're needed.
* This is an alias for IterableMapper.
*/
export const Prefetcher = IterableMapper;
export type PrefetcherOptions = IterableMapperOptions;

/**
* BackgroundFlusher - Processes items in the background with results accessible via iteration.
* This is an alias for IterableQueueMapper.
*/
export const BackgroundFlusher = IterableQueueMapper;
export type BackgroundFlusherOptions = IterableQueueMapperOptions;

/**
* SimpleBackgroundFlusher - Processes items in the background, automatically discarding results.
* This is an alias for IterableQueueMapperSimple.
*/
export const SimpleBackgroundFlusher = IterableQueueMapperSimple;
export type SimpleBackgroundFlusherOptions = IterableQueueMapperSimpleOptions;

// Export all types and classes
export {
Mapper,
IterableMapper,
IterableMapperOptions,
IterableQueueMapper,
IterableQueueMapperOptions,
IterableQueueMapperSimple,
IterableQueueMapperSimpleOptions,
BlockingQueue,
BlockingQueueOptions,
IterableQueue,
Expand Down
93 changes: 93 additions & 0 deletions src/iterable-queue-mapper-simple.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,97 @@ describe('IterableQueueMapperSimple', () => {

expect(backgroundWriter.errors.length).toBe(0);
});

describe('maxQueueDepth option', () => {
// Test default behavior without using mocks
it('defaults maxQueueDepth to equal concurrency', async () => {
const mapper = jest.fn(async (): Promise<void> => {
await sleep(10);
});

// With default configuration (no explicit maxQueueDepth)
const backgroundWriter = new IterableQueueMapperSimple(mapper, {
concurrency: 3,
});

// Queue 3 items (should accept without delay - concurrency 3, default maxQueueDepth 3)
await backgroundWriter.enqueue(1);
await backgroundWriter.enqueue(2);
await backgroundWriter.enqueue(3);

// 4th item should only be accepted after one completes
const startTime = Date.now();
await backgroundWriter.enqueue(4);
const elapsed = Date.now() - startTime;

// Queue should have been full after first 3 items
expect(elapsed).toBeGreaterThan(5);

await backgroundWriter.onIdle();
});

it('allows configuring independent queue depth with maxQueueDepth', async () => {
// Using a longer sleep to make the test more reliable
const sleepTime = 50;
const mapper = jest.fn(async (): Promise<void> => {
await sleep(sleepTime);
});

// Set concurrency to 1 but maxQueueDepth to 3
// This means:
// - Only 1 item processed at a time
// - Up to 3 items can be queued before blocking
const backgroundWriter = new IterableQueueMapperSimple(mapper, {
concurrency: 1,
maxQueueDepth: 3,
});

// Queue 3 items (should accept without delay with maxQueueDepth 3)
await backgroundWriter.enqueue(1);
await backgroundWriter.enqueue(2);
await backgroundWriter.enqueue(3);

// 4th item should have to wait
const startTime = Date.now();
await backgroundWriter.enqueue(4);
const elapsed = Date.now() - startTime;

// Verify that enqueuing the 4th item had to wait
expect(elapsed).toBeGreaterThanOrEqual(sleepTime - 10); // Allow some margin

await backgroundWriter.onIdle();

// Verify all items were processed
expect(mapper).toHaveBeenCalledTimes(4);
});

it('processes items in FIFO order with concurrency 1', async () => {
const processedItems: number[] = [];

// Create a mapper that records the order of processing
const mapper = jest.fn(async (item: number): Promise<void> => {
await sleep(10);
processedItems.push(item);
});

const backgroundWriter = new IterableQueueMapperSimple(mapper, {
concurrency: 1, // Process 1 at a time (sequential FIFO)
maxQueueDepth: 5, // Allow up to 5 in queue
});

// Queue all items
await backgroundWriter.enqueue(1);
await backgroundWriter.enqueue(2);
await backgroundWriter.enqueue(3);
await backgroundWriter.enqueue(4);
await backgroundWriter.enqueue(5);
await backgroundWriter.enqueue(6);

// Wait for all to complete
await backgroundWriter.onIdle();

// Verify FIFO order was maintained
expect(processedItems).toEqual([1, 2, 3, 4, 5, 6]);
});
});
});
44 changes: 39 additions & 5 deletions src/iterable-queue-mapper-simple.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { IterableMapperOptions, Mapper } from './iterable-mapper';
import { IterableQueueMapper } from './iterable-queue-mapper';

// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand All @@ -9,7 +8,31 @@
/**
* Options for IterableQueueMapperSimple
*/
export type IterableQueueMapperSimpleOptions = Pick<IterableMapperOptions, 'concurrency'>;
export interface IterableQueueMapperSimpleOptions {
/**
* Maximum number of concurrent invocations of `mapper` to run at once.
*
* Must be an integer from 1 and up or `Infinity`.
*
* @default 4
*/
readonly concurrency?: number;

/**
* Maximum number of items that can be queued before blocking.
* This allows control over how many items can be added to the queue
* while waiting for processing, independently of concurrency.
*
* For example, with concurrency=1 and maxQueueDepth=8, it will:
* - Process one item at a time in FIFO order
* - Allow up to 8 items to be queued before blocking on enqueue()
*
* Must be an integer from 1 and up or `Infinity`, and should be >= `concurrency`.
*
* @default Same as concurrency
*/
readonly maxQueueDepth?: number;
}

/**
* Accepts queue items via `enqueue` and calls the `mapper` on them
Expand All @@ -25,6 +48,11 @@
* - In the simple sequential (`concurrency: 1`) case, allows 1 item to be flushed async while caller prepares next item
* - Results of the flushed items are not needed in a subsequent step (if they are, use `IterableQueueMapper`)
*
* ### Queue Management
* - Use `concurrency: 1` for sequential processing (e.g., for DB writes that must be in order)
* - Use `maxQueueDepth` to control how many items can be queued up before blocking
* - For example, `{ concurrency: 1, maxQueueDepth: 8 }` allows 8 items to be queued while processing 1 at a time
*
* ### Error Handling
* The mapper should ideally handle all errors internally to enable error handling
* closest to where they occur. However, if errors do escape the mapper:
Expand All @@ -33,7 +61,7 @@
* - Errors can be checked/handled during processing via the `errors` property
*
* Key Differences from `IterableQueueMapper`:
* - `maxUnread` defaults to equal `concurrency` (simplifying queue management)
* - `maxQueueDepth` controls how many items can be queued before blocking (defaults to equal `concurrency`)
* - Results are automatically iterated and discarded (all work should happen in mapper)
* - Errors are collected rather than thrown (available via errors property)
*
Expand All @@ -55,7 +83,7 @@
private readonly _writer: IterableQueueMapper<Element, typeof NoResult>;
private readonly _errors: Errors<Element> = [];
private readonly _done: Promise<void>;
private readonly _mapper: Mapper<Element, void>;

Check failure on line 86 in src/iterable-queue-mapper-simple.ts

View workflow job for this annotation

GitHub Actions / build

Cannot find name 'Mapper'.
private _isIdle = false;

/**
Expand All @@ -69,12 +97,18 @@
* @see {@link IterableQueueMapper} for related class with more configuration options
* @see {@link IterableMapper} for underlying mapper implementation and examples of combined usage
*/
constructor(mapper: Mapper<Element, void>, options: IterableQueueMapperSimpleOptions = {}) {

Check failure on line 100 in src/iterable-queue-mapper-simple.ts

View workflow job for this annotation

GitHub Actions / build

Cannot find name 'Mapper'.
const { concurrency = 4 } = options;
const { concurrency = 4, maxQueueDepth } = options;

// If maxQueueDepth is not specified, default to concurrency (maintaining backward compatibility)
const effectiveMaxUnread = maxQueueDepth !== undefined ? maxQueueDepth : concurrency;

this._mapper = mapper;
this.worker = this.worker.bind(this);
this._writer = new IterableQueueMapper(this.worker, { concurrency, maxUnread: concurrency });
this._writer = new IterableQueueMapper(this.worker, {
concurrency,
maxUnread: effectiveMaxUnread,
});

// Discard all of the results
this._done = this.discardResults();
Expand Down
Loading