Skip to content

[FSSDK-11513] limit number of events in the eventStore #1053

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 157 additions & 2 deletions lib/event_processor/batch_event_processor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@
import { expect, describe, it, vi, beforeEach, afterEach, MockInstance } from 'vitest';

import { EventWithId, BatchEventProcessor, LOGGER_NAME } from './batch_event_processor';
import { getMockSyncCache } from '../tests/mock/mock_cache';
import { getMockAsyncCache, getMockSyncCache } from '../tests/mock/mock_cache';
import { createImpressionEvent } from '../tests/mock/create_event';
import { ProcessableEvent } from './event_processor';
import { buildLogEvent } from './event_builder/log_event';
import { resolvablePromise } from '../utils/promise/resolvablePromise';
import { ResolvablePromise, resolvablePromise } from '../utils/promise/resolvablePromise';
import { advanceTimersByTime } from '../tests/testUtils';
import { getMockLogger } from '../tests/mock/mock_logger';
import { getMockRepeater } from '../tests/mock/mock_repeater';
import * as retry from '../utils/executor/backoff_retry_runner';
import { ServiceState, StartupLog } from '../service';
import { LogLevel } from '../logging/logger';
import { IdGenerator } from '../utils/id_generator';

const getMockDispatcher = () => {
return {
Expand Down Expand Up @@ -366,6 +367,160 @@ describe('BatchEventProcessor', async () => {

expect(events).toEqual(eventsInStore);
});

it('should not store the event in the eventStore but still dispatch if the \
number of pending events is greater than the limit', async () => {
const eventDispatcher = getMockDispatcher();
const mockDispatch: MockInstance<typeof eventDispatcher.dispatchEvent> = eventDispatcher.dispatchEvent;
mockDispatch.mockResolvedValue(resolvablePromise().promise);

const eventStore = getMockSyncCache<EventWithId>();

const idGenerator = new IdGenerator();

for (let i = 0; i < 505; i++) {
const event = createImpressionEvent(`id-${i}`);
const cacheId = idGenerator.getId();
await eventStore.set(cacheId, { id: cacheId, event });
}

expect(eventStore.size()).toEqual(505);

const processor = new BatchEventProcessor({
eventDispatcher,
dispatchRepeater: getMockRepeater(),
batchSize: 1,
eventStore,
});

processor.start();
await processor.onRunning();

const events: ProcessableEvent[] = [];
for(let i = 0; i < 2; i++) {
const event = createImpressionEvent(`id-${i}`);
events.push(event);
await processor.process(event)
}

expect(eventStore.size()).toEqual(505);
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(507);
expect(eventDispatcher.dispatchEvent.mock.calls[505][0]).toEqual(buildLogEvent([events[0]]));
expect(eventDispatcher.dispatchEvent.mock.calls[506][0]).toEqual(buildLogEvent([events[1]]));
});

it('should store events in the eventStore when the number of events in the store\
becomes lower than the limit', async () => {
const eventDispatcher = getMockDispatcher();

const dispatchResponses: ResolvablePromise<any>[] = [];

const mockDispatch: MockInstance<typeof eventDispatcher.dispatchEvent> = eventDispatcher.dispatchEvent;
mockDispatch.mockImplementation((arg) => {
const dispatchResponse = resolvablePromise();
dispatchResponses.push(dispatchResponse);
return dispatchResponse.promise;
});

const eventStore = getMockSyncCache<EventWithId>();

const idGenerator = new IdGenerator();

for (let i = 0; i < 502; i++) {
const event = createImpressionEvent(`id-${i}`);
const cacheId = String(i);
await eventStore.set(cacheId, { id: cacheId, event });
}

expect(eventStore.size()).toEqual(502);

const processor = new BatchEventProcessor({
eventDispatcher,
dispatchRepeater: getMockRepeater(),
batchSize: 1,
eventStore,
});

processor.start();
await processor.onRunning();

let events: ProcessableEvent[] = [];
for(let i = 0; i < 2; i++) {
const event = createImpressionEvent(`id-${i + 502}`);
events.push(event);
await processor.process(event)
}

expect(eventStore.size()).toEqual(502);
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(504);

expect(eventDispatcher.dispatchEvent.mock.calls[502][0]).toEqual(buildLogEvent([events[0]]));
expect(eventDispatcher.dispatchEvent.mock.calls[503][0]).toEqual(buildLogEvent([events[1]]));

// resolve the dispatch for events not saved in the store
dispatchResponses[502].resolve({ statusCode: 200 });
dispatchResponses[503].resolve({ statusCode: 200 });

await exhaustMicrotasks();
expect(eventStore.size()).toEqual(502);

// resolve the dispatch for 3 events in store, making the store size 499 which is lower than the limit
dispatchResponses[0].resolve({ statusCode: 200 });
dispatchResponses[1].resolve({ statusCode: 200 });
dispatchResponses[2].resolve({ statusCode: 200 });

await exhaustMicrotasks();
expect(eventStore.size()).toEqual(499);

// process 2 more events
events = [];
for(let i = 0; i < 2; i++) {
const event = createImpressionEvent(`id-${i + 504}`);
events.push(event);
await processor.process(event)
}

expect(eventStore.size()).toEqual(500);
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(506);
expect(eventDispatcher.dispatchEvent.mock.calls[504][0]).toEqual(buildLogEvent([events[0]]));
expect(eventDispatcher.dispatchEvent.mock.calls[505][0]).toEqual(buildLogEvent([events[1]]));
});

it('should still dispatch events even if the store save fails', async () => {
const eventDispatcher = getMockDispatcher();
const mockDispatch: MockInstance<typeof eventDispatcher.dispatchEvent> = eventDispatcher.dispatchEvent;
mockDispatch.mockResolvedValue({});

const eventStore = getMockAsyncCache<EventWithId>();
// Simulate failure in saving to store
eventStore.set = vi.fn().mockRejectedValue(new Error('Failed to save'));

const dispatchRepeater = getMockRepeater();

const processor = new BatchEventProcessor({
eventDispatcher,
dispatchRepeater,
batchSize: 100,
eventStore,
});

processor.start();
await processor.onRunning();

const events: ProcessableEvent[] = [];
for(let i = 0; i < 10; i++) {
const event = createImpressionEvent(`id-${i}`);
events.push(event);
await processor.process(event)
}

expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(0);

await dispatchRepeater.execute(0);

expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(1);
expect(eventDispatcher.dispatchEvent.mock.calls[0][0]).toEqual(buildLogEvent(events));
});
});

it('should dispatch events when dispatchRepeater is triggered', async () => {
Expand Down
75 changes: 56 additions & 19 deletions lib/event_processor/batch_event_processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import { EventProcessor, ProcessableEvent } from "./event_processor";
import { getBatchedAsync, getBatchedSync, Store } from "../utils/cache/store";
import { EventDispatcher, EventDispatcherResponse, LogEvent } from "./event_dispatcher/event_dispatcher";
import { buildLogEvent } from "./event_builder/log_event";
import { BackoffController, ExponentialBackoff, IntervalRepeater, Repeater } from "../utils/repeater/repeater";
import { BackoffController, ExponentialBackoff, Repeater } from "../utils/repeater/repeater";
import { LoggerFacade } from '../logging/logger';
import { BaseService, ServiceState, StartupLog } from "../service";
import { Consumer, Fn, Producer } from "../utils/type";
import { Consumer, Fn, Maybe, Producer } from "../utils/type";
import { RunResult, runWithRetry } from "../utils/executor/backoff_retry_runner";
import { isSuccessStatusCode } from "../utils/http_request_handler/http_util";
import { EventEmitter } from "../utils/event_emitter/event_emitter";
Expand All @@ -31,13 +31,16 @@ import { FAILED_TO_DISPATCH_EVENTS, SERVICE_NOT_RUNNING } from "error_message";
import { OptimizelyError } from "../error/optimizly_error";
import { sprintf } from "../utils/fns";
import { SERVICE_STOPPED_BEFORE_RUNNING } from "../service";
import { EVENT_STORE_FULL } from "../message/log_message";

export const DEFAULT_MIN_BACKOFF = 1000;
export const DEFAULT_MAX_BACKOFF = 32000;
export const MAX_EVENTS_IN_STORE = 500;

export type EventWithId = {
id: string;
event: ProcessableEvent;
notStored?: boolean;
};

export type RetryConfig = {
Expand All @@ -59,7 +62,7 @@ export type BatchEventProcessorConfig = {

type EventBatch = {
request: LogEvent,
ids: string[],
events: EventWithId[],
}

export const LOGGER_NAME = 'BatchEventProcessor';
Expand All @@ -70,11 +73,13 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
private eventQueue: EventWithId[] = [];
private batchSize: number;
private eventStore?: Store<EventWithId>;
private eventCountInStore: Maybe<number> = undefined;
private maxEventsInStore: number = MAX_EVENTS_IN_STORE;
private dispatchRepeater: Repeater;
private failedEventRepeater?: Repeater;
private idGenerator: IdGenerator = new IdGenerator();
private runningTask: Map<string, RunResult<EventDispatcherResponse>> = new Map();
private dispatchingEventIds: Set<string> = new Set();
private dispatchingEvents: Map<string, EventWithId> = new Map();
private eventEmitter: EventEmitter<{ dispatch: LogEvent }> = new EventEmitter();
private retryConfig?: RetryConfig;

Expand All @@ -84,11 +89,13 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
this.closingEventDispatcher = config.closingEventDispatcher;
this.batchSize = config.batchSize;
this.eventStore = config.eventStore;

this.retryConfig = config.retryConfig;

this.dispatchRepeater = config.dispatchRepeater;
this.dispatchRepeater.setTask(() => this.flush());

this.maxEventsInStore = Math.max(2 * config.batchSize, MAX_EVENTS_IN_STORE);
this.failedEventRepeater = config.failedEventRepeater;
this.failedEventRepeater?.setTask(() => this.retryFailedEvents());
if (config.logger) {
Expand All @@ -111,7 +118,7 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
}

const keys = (await this.eventStore.getKeys()).filter(
(k) => !this.dispatchingEventIds.has(k) && !this.eventQueue.find((e) => e.id === k)
(k) => !this.dispatchingEvents.has(k) && !this.eventQueue.find((e) => e.id === k)
);

const events = await (this.eventStore.operation === 'sync' ?
Expand All @@ -138,7 +145,7 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
(currentBatch.length > 0 && !areEventContextsEqual(currentBatch[0].event, event.event))) {
batches.push({
request: buildLogEvent(currentBatch.map((e) => e.event)),
ids: currentBatch.map((e) => e.id),
events: currentBatch,
});
currentBatch = [];
}
Expand All @@ -148,7 +155,7 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
if (currentBatch.length > 0) {
batches.push({
request: buildLogEvent(currentBatch.map((e) => e.event)),
ids: currentBatch.map((e) => e.id),
events: currentBatch,
});
}

Expand All @@ -163,15 +170,15 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
}

const events: ProcessableEvent[] = [];
const ids: string[] = [];
const eventWithIds: EventWithId[] = [];

this.eventQueue.forEach((event) => {
events.push(event.event);
ids.push(event.id);
eventWithIds.push(event);
});

this.eventQueue = [];
return { request: buildLogEvent(events), ids };
return { request: buildLogEvent(events), events: eventWithIds };
}

private async executeDispatch(request: LogEvent, closing = false): Promise<EventDispatcherResponse> {
Expand All @@ -185,10 +192,10 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
}

private dispatchBatch(batch: EventBatch, closing: boolean): void {
const { request, ids } = batch;
const { request, events } = batch;

ids.forEach((id) => {
this.dispatchingEventIds.add(id);
events.forEach((event) => {
this.dispatchingEvents.set(event.id, event);
});

const runResult: RunResult<EventDispatcherResponse> = this.retryConfig
Expand All @@ -205,9 +212,11 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
this.runningTask.set(taskId, runResult);

runResult.result.then((res) => {
ids.forEach((id) => {
this.dispatchingEventIds.delete(id);
this.eventStore?.remove(id);
events.forEach((event) => {
this.eventStore?.remove(event.id);
if (!event.notStored && this.eventCountInStore) {
Copy link
Preview

Copilot AI May 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check 'this.eventCountInStore' may fail when the count is 0 (since 0 is falsy), which could prevent the intended decrement. Consider replacing it with 'this.eventCountInStore !== undefined' to ensure the condition works correctly even when the count is 0.

Suggested change
if (!event.notStored && this.eventCountInStore) {
if (!event.notStored && this.eventCountInStore !== undefined) {

Copilot uses AI. Check for mistakes.

this.eventCountInStore--;
}
});
return Promise.resolve();
}).catch((err) => {
Expand All @@ -216,7 +225,7 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
this.logger?.error(err);
}).finally(() => {
this.runningTask.delete(taskId);
ids.forEach((id) => this.dispatchingEventIds.delete(id));
events.forEach((event) => this.dispatchingEvents.delete(event.id));
});
}

Expand All @@ -235,12 +244,12 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
return Promise.reject(new OptimizelyError(SERVICE_NOT_RUNNING, 'BatchEventProcessor'));
}

const eventWithId = {
const eventWithId: EventWithId = {
id: this.idGenerator.getId(),
event: event,
};

await this.eventStore?.set(eventWithId.id, eventWithId);
await this.storeEvent(eventWithId);

if (this.eventQueue.length > 0 && !areEventContextsEqual(this.eventQueue[0].event, event)) {
this.flush();
Expand All @@ -253,7 +262,35 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
} else if (!this.dispatchRepeater.isRunning()) {
this.dispatchRepeater.start();
}
}

private async findEventCountInStore(): Promise<void> {
if (this.eventStore && this.eventCountInStore === undefined) {
try {
const keys = await this.eventStore.getKeys();
this.eventCountInStore = keys.length;
} catch (e) {
this.logger?.error(e);
}
}
}

private async storeEvent(eventWithId: EventWithId): Promise<void> {
await this.findEventCountInStore();
if (this.eventCountInStore !== undefined && this.eventCountInStore >= this.maxEventsInStore) {
this.logger?.info(EVENT_STORE_FULL, eventWithId.event.uuid);
eventWithId.notStored = true;
return;
}

await Promise.resolve(this.eventStore?.set(eventWithId.id, eventWithId)).then(() => {
if (this.eventCountInStore !== undefined) {
this.eventCountInStore++;
}
}).catch((e) => {
eventWithId.notStored = true;
this.logger?.error(e);
});
}

start(): void {
Expand Down
1 change: 1 addition & 0 deletions lib/message/log_message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,6 @@ export const USER_HAS_NO_FORCED_VARIATION_FOR_EXPERIMENT =
'No experiment %s mapped to user %s in the forced variation map.';
export const INVALID_EXPERIMENT_KEY_INFO =
'Experiment key %s is not in datafile. It is either invalid, paused, or archived.';
export const EVENT_STORE_FULL = 'Event store is full. Not saving event with id %d.';

export const messages: string[] = [];
Loading