Skip to content

Commit cdb729f

Browse files
committed
[FSSDK-11513] limit number of events in the eventStore
1 parent 3c63d79 commit cdb729f

File tree

3 files changed

+218
-21
lines changed

3 files changed

+218
-21
lines changed

lib/event_processor/batch_event_processor.spec.ts

+157-2
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,18 @@
1616
import { expect, describe, it, vi, beforeEach, afterEach, MockInstance } from 'vitest';
1717

1818
import { EventWithId, BatchEventProcessor, LOGGER_NAME } from './batch_event_processor';
19-
import { getMockSyncCache } from '../tests/mock/mock_cache';
19+
import { getMockAsyncCache, getMockSyncCache } from '../tests/mock/mock_cache';
2020
import { createImpressionEvent } from '../tests/mock/create_event';
2121
import { ProcessableEvent } from './event_processor';
2222
import { buildLogEvent } from './event_builder/log_event';
23-
import { resolvablePromise } from '../utils/promise/resolvablePromise';
23+
import { ResolvablePromise, resolvablePromise } from '../utils/promise/resolvablePromise';
2424
import { advanceTimersByTime } from '../tests/testUtils';
2525
import { getMockLogger } from '../tests/mock/mock_logger';
2626
import { getMockRepeater } from '../tests/mock/mock_repeater';
2727
import * as retry from '../utils/executor/backoff_retry_runner';
2828
import { ServiceState, StartupLog } from '../service';
2929
import { LogLevel } from '../logging/logger';
30+
import { IdGenerator } from '../utils/id_generator';
3031

3132
const getMockDispatcher = () => {
3233
return {
@@ -366,6 +367,160 @@ describe('BatchEventProcessor', async () => {
366367

367368
expect(events).toEqual(eventsInStore);
368369
});
370+
371+
it('should not store the event in the eventStore but still dispatch if the \
372+
number of pending events is greater than the limit', async () => {
373+
const eventDispatcher = getMockDispatcher();
374+
const mockDispatch: MockInstance<typeof eventDispatcher.dispatchEvent> = eventDispatcher.dispatchEvent;
375+
mockDispatch.mockResolvedValue(resolvablePromise().promise);
376+
377+
const eventStore = getMockSyncCache<EventWithId>();
378+
379+
const idGenerator = new IdGenerator();
380+
381+
for (let i = 0; i < 505; i++) {
382+
const event = createImpressionEvent(`id-${i}`);
383+
const cacheId = idGenerator.getId();
384+
await eventStore.set(cacheId, { id: cacheId, event });
385+
}
386+
387+
expect(eventStore.size()).toEqual(505);
388+
389+
const processor = new BatchEventProcessor({
390+
eventDispatcher,
391+
dispatchRepeater: getMockRepeater(),
392+
batchSize: 1,
393+
eventStore,
394+
});
395+
396+
processor.start();
397+
await processor.onRunning();
398+
399+
const events: ProcessableEvent[] = [];
400+
for(let i = 0; i < 2; i++) {
401+
const event = createImpressionEvent(`id-${i}`);
402+
events.push(event);
403+
await processor.process(event)
404+
}
405+
406+
expect(eventStore.size()).toEqual(505);
407+
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(507);
408+
expect(eventDispatcher.dispatchEvent.mock.calls[505][0]).toEqual(buildLogEvent([events[0]]));
409+
expect(eventDispatcher.dispatchEvent.mock.calls[506][0]).toEqual(buildLogEvent([events[1]]));
410+
});
411+
412+
it('should store events in the eventStore when the number of events in the store\
413+
becomes lower than the limit', async () => {
414+
const eventDispatcher = getMockDispatcher();
415+
416+
const dispatchResponses: ResolvablePromise<any>[] = [];
417+
418+
const mockDispatch: MockInstance<typeof eventDispatcher.dispatchEvent> = eventDispatcher.dispatchEvent;
419+
mockDispatch.mockImplementation((arg) => {
420+
const dispatchResponse = resolvablePromise();
421+
dispatchResponses.push(dispatchResponse);
422+
return dispatchResponse.promise;
423+
});
424+
425+
const eventStore = getMockSyncCache<EventWithId>();
426+
427+
const idGenerator = new IdGenerator();
428+
429+
for (let i = 0; i < 502; i++) {
430+
const event = createImpressionEvent(`id-${i}`);
431+
const cacheId = String(i);
432+
await eventStore.set(cacheId, { id: cacheId, event });
433+
}
434+
435+
expect(eventStore.size()).toEqual(502);
436+
437+
const processor = new BatchEventProcessor({
438+
eventDispatcher,
439+
dispatchRepeater: getMockRepeater(),
440+
batchSize: 1,
441+
eventStore,
442+
});
443+
444+
processor.start();
445+
await processor.onRunning();
446+
447+
let events: ProcessableEvent[] = [];
448+
for(let i = 0; i < 2; i++) {
449+
const event = createImpressionEvent(`id-${i + 502}`);
450+
events.push(event);
451+
await processor.process(event)
452+
}
453+
454+
expect(eventStore.size()).toEqual(502);
455+
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(504);
456+
457+
expect(eventDispatcher.dispatchEvent.mock.calls[502][0]).toEqual(buildLogEvent([events[0]]));
458+
expect(eventDispatcher.dispatchEvent.mock.calls[503][0]).toEqual(buildLogEvent([events[1]]));
459+
460+
// resolve the dispatch for events not saved in the store
461+
dispatchResponses[502].resolve({ statusCode: 200 });
462+
dispatchResponses[503].resolve({ statusCode: 200 });
463+
464+
await exhaustMicrotasks();
465+
expect(eventStore.size()).toEqual(502);
466+
467+
// resolve the dispatch for 3 events in store, making the store size 499 which is lower than the limit
468+
dispatchResponses[0].resolve({ statusCode: 200 });
469+
dispatchResponses[1].resolve({ statusCode: 200 });
470+
dispatchResponses[2].resolve({ statusCode: 200 });
471+
472+
await exhaustMicrotasks();
473+
expect(eventStore.size()).toEqual(499);
474+
475+
// process 2 more events
476+
events = [];
477+
for(let i = 0; i < 2; i++) {
478+
const event = createImpressionEvent(`id-${i + 504}`);
479+
events.push(event);
480+
await processor.process(event)
481+
}
482+
483+
expect(eventStore.size()).toEqual(500);
484+
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(506);
485+
expect(eventDispatcher.dispatchEvent.mock.calls[504][0]).toEqual(buildLogEvent([events[0]]));
486+
expect(eventDispatcher.dispatchEvent.mock.calls[505][0]).toEqual(buildLogEvent([events[1]]));
487+
});
488+
489+
it('should still dispatch events even if the store save fails', async () => {
490+
const eventDispatcher = getMockDispatcher();
491+
const mockDispatch: MockInstance<typeof eventDispatcher.dispatchEvent> = eventDispatcher.dispatchEvent;
492+
mockDispatch.mockResolvedValue({});
493+
494+
const eventStore = getMockAsyncCache<EventWithId>();
495+
// Simulate failure in saving to store
496+
eventStore.set = vi.fn().mockRejectedValue(new Error('Failed to save'));
497+
498+
const dispatchRepeater = getMockRepeater();
499+
500+
const processor = new BatchEventProcessor({
501+
eventDispatcher,
502+
dispatchRepeater,
503+
batchSize: 100,
504+
eventStore,
505+
});
506+
507+
processor.start();
508+
await processor.onRunning();
509+
510+
const events: ProcessableEvent[] = [];
511+
for(let i = 0; i < 10; i++) {
512+
const event = createImpressionEvent(`id-${i}`);
513+
events.push(event);
514+
await processor.process(event)
515+
}
516+
517+
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(0);
518+
519+
await dispatchRepeater.execute(0);
520+
521+
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(1);
522+
expect(eventDispatcher.dispatchEvent.mock.calls[0][0]).toEqual(buildLogEvent(events));
523+
});
369524
});
370525

371526
it('should dispatch events when dispatchRepeater is triggered', async () => {

lib/event_processor/batch_event_processor.ts

+60-19
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@ import { EventProcessor, ProcessableEvent } from "./event_processor";
1818
import { getBatchedAsync, getBatchedSync, Store } from "../utils/cache/store";
1919
import { EventDispatcher, EventDispatcherResponse, LogEvent } from "./event_dispatcher/event_dispatcher";
2020
import { buildLogEvent } from "./event_builder/log_event";
21-
import { BackoffController, ExponentialBackoff, IntervalRepeater, Repeater } from "../utils/repeater/repeater";
21+
import { BackoffController, ExponentialBackoff, Repeater } from "../utils/repeater/repeater";
2222
import { LoggerFacade } from '../logging/logger';
2323
import { BaseService, ServiceState, StartupLog } from "../service";
24-
import { Consumer, Fn, Producer } from "../utils/type";
24+
import { Consumer, Fn, Maybe, Producer } from "../utils/type";
2525
import { RunResult, runWithRetry } from "../utils/executor/backoff_retry_runner";
2626
import { isSuccessStatusCode } from "../utils/http_request_handler/http_util";
2727
import { EventEmitter } from "../utils/event_emitter/event_emitter";
@@ -31,13 +31,16 @@ import { FAILED_TO_DISPATCH_EVENTS, SERVICE_NOT_RUNNING } from "error_message";
3131
import { OptimizelyError } from "../error/optimizly_error";
3232
import { sprintf } from "../utils/fns";
3333
import { SERVICE_STOPPED_BEFORE_RUNNING } from "../service";
34+
import { EVENT_STORE_FULL } from "../message/log_message";
3435

3536
export const DEFAULT_MIN_BACKOFF = 1000;
3637
export const DEFAULT_MAX_BACKOFF = 32000;
38+
export const MAX_EVENTS_IN_STORE = 500;
3739

3840
export type EventWithId = {
3941
id: string;
4042
event: ProcessableEvent;
43+
notStored?: boolean;
4144
};
4245

4346
export type RetryConfig = {
@@ -59,7 +62,8 @@ export type BatchEventProcessorConfig = {
5962

6063
type EventBatch = {
6164
request: LogEvent,
62-
ids: string[],
65+
// ids: string[],
66+
events: EventWithId[],
6367
}
6468

6569
export const LOGGER_NAME = 'BatchEventProcessor';
@@ -70,11 +74,13 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
7074
private eventQueue: EventWithId[] = [];
7175
private batchSize: number;
7276
private eventStore?: Store<EventWithId>;
77+
private eventCountInStore: Maybe<number> = undefined;
78+
private maxEventsInStore: number = MAX_EVENTS_IN_STORE;
7379
private dispatchRepeater: Repeater;
7480
private failedEventRepeater?: Repeater;
7581
private idGenerator: IdGenerator = new IdGenerator();
7682
private runningTask: Map<string, RunResult<EventDispatcherResponse>> = new Map();
77-
private dispatchingEventIds: Set<string> = new Set();
83+
private dispatchingEvents: Map<string, EventWithId> = new Map();
7884
private eventEmitter: EventEmitter<{ dispatch: LogEvent }> = new EventEmitter();
7985
private retryConfig?: RetryConfig;
8086

@@ -84,11 +90,13 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
8490
this.closingEventDispatcher = config.closingEventDispatcher;
8591
this.batchSize = config.batchSize;
8692
this.eventStore = config.eventStore;
93+
8794
this.retryConfig = config.retryConfig;
8895

8996
this.dispatchRepeater = config.dispatchRepeater;
9097
this.dispatchRepeater.setTask(() => this.flush());
9198

99+
this.maxEventsInStore = Math.max(2 * config.batchSize, MAX_EVENTS_IN_STORE);
92100
this.failedEventRepeater = config.failedEventRepeater;
93101
this.failedEventRepeater?.setTask(() => this.retryFailedEvents());
94102
if (config.logger) {
@@ -111,7 +119,7 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
111119
}
112120

113121
const keys = (await this.eventStore.getKeys()).filter(
114-
(k) => !this.dispatchingEventIds.has(k) && !this.eventQueue.find((e) => e.id === k)
122+
(k) => !this.dispatchingEvents.has(k) && !this.eventQueue.find((e) => e.id === k)
115123
);
116124

117125
const events = await (this.eventStore.operation === 'sync' ?
@@ -138,7 +146,8 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
138146
(currentBatch.length > 0 && !areEventContextsEqual(currentBatch[0].event, event.event))) {
139147
batches.push({
140148
request: buildLogEvent(currentBatch.map((e) => e.event)),
141-
ids: currentBatch.map((e) => e.id),
149+
// ids: currentBatch.map((e) => e.id),
150+
events: currentBatch,
142151
});
143152
currentBatch = [];
144153
}
@@ -148,7 +157,8 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
148157
if (currentBatch.length > 0) {
149158
batches.push({
150159
request: buildLogEvent(currentBatch.map((e) => e.event)),
151-
ids: currentBatch.map((e) => e.id),
160+
// ids: currentBatch.map((e) => e.id),
161+
events: currentBatch,
152162
});
153163
}
154164

@@ -163,15 +173,15 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
163173
}
164174

165175
const events: ProcessableEvent[] = [];
166-
const ids: string[] = [];
176+
const eventWithIds: EventWithId[] = [];
167177

168178
this.eventQueue.forEach((event) => {
169179
events.push(event.event);
170-
ids.push(event.id);
180+
eventWithIds.push(event);
171181
});
172182

173183
this.eventQueue = [];
174-
return { request: buildLogEvent(events), ids };
184+
return { request: buildLogEvent(events), events: eventWithIds };
175185
}
176186

177187
private async executeDispatch(request: LogEvent, closing = false): Promise<EventDispatcherResponse> {
@@ -185,10 +195,11 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
185195
}
186196

187197
private dispatchBatch(batch: EventBatch, closing: boolean): void {
188-
const { request, ids } = batch;
198+
const { request, events } = batch;
189199

190-
ids.forEach((id) => {
191-
this.dispatchingEventIds.add(id);
200+
events.forEach((event) => {
201+
// this.dispatchingEventIds.add(id);
202+
this.dispatchingEvents.set(event.id, event);
192203
});
193204

194205
const runResult: RunResult<EventDispatcherResponse> = this.retryConfig
@@ -205,9 +216,11 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
205216
this.runningTask.set(taskId, runResult);
206217

207218
runResult.result.then((res) => {
208-
ids.forEach((id) => {
209-
this.dispatchingEventIds.delete(id);
210-
this.eventStore?.remove(id);
219+
events.forEach((event) => {
220+
this.eventStore?.remove(event.id);
221+
if (!event.notStored && this.eventCountInStore) {
222+
this.eventCountInStore--;
223+
}
211224
});
212225
return Promise.resolve();
213226
}).catch((err) => {
@@ -216,7 +229,7 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
216229
this.logger?.error(err);
217230
}).finally(() => {
218231
this.runningTask.delete(taskId);
219-
ids.forEach((id) => this.dispatchingEventIds.delete(id));
232+
events.forEach((event) => this.dispatchingEvents.delete(event.id));
220233
});
221234
}
222235

@@ -235,12 +248,12 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
235248
return Promise.reject(new OptimizelyError(SERVICE_NOT_RUNNING, 'BatchEventProcessor'));
236249
}
237250

238-
const eventWithId = {
251+
const eventWithId: EventWithId = {
239252
id: this.idGenerator.getId(),
240253
event: event,
241254
};
242255

243-
await this.eventStore?.set(eventWithId.id, eventWithId);
256+
await this.storeEvent(eventWithId);
244257

245258
if (this.eventQueue.length > 0 && !areEventContextsEqual(this.eventQueue[0].event, event)) {
246259
this.flush();
@@ -253,7 +266,35 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
253266
} else if (!this.dispatchRepeater.isRunning()) {
254267
this.dispatchRepeater.start();
255268
}
269+
}
270+
271+
private async findEventCountInStore(): Promise<void> {
272+
if (this.eventStore && this.eventCountInStore === undefined) {
273+
try {
274+
const keys = await this.eventStore.getKeys();
275+
this.eventCountInStore = keys.length;
276+
} catch (e) {
277+
this.logger?.error(e);
278+
}
279+
}
280+
}
256281

282+
private async storeEvent(eventWithId: EventWithId): Promise<void> {
283+
await this.findEventCountInStore();
284+
if (this.eventCountInStore !== undefined && this.eventCountInStore >= this.maxEventsInStore) {
285+
this.logger?.info(EVENT_STORE_FULL, eventWithId.event.uuid);
286+
eventWithId.notStored = true;
287+
return;
288+
}
289+
290+
await Promise.resolve(this.eventStore?.set(eventWithId.id, eventWithId)).then(() => {
291+
if (this.eventCountInStore !== undefined) {
292+
this.eventCountInStore++;
293+
}
294+
}).catch((e) => {
295+
eventWithId.notStored = true;
296+
this.logger?.error(e);
297+
});
257298
}
258299

259300
start(): void {

lib/message/log_message.ts

+1
Original file line numberDiff line numberDiff line change
@@ -60,5 +60,6 @@ export const USER_HAS_NO_FORCED_VARIATION_FOR_EXPERIMENT =
6060
'No experiment %s mapped to user %s in the forced variation map.';
6161
export const INVALID_EXPERIMENT_KEY_INFO =
6262
'Experiment key %s is not in datafile. It is either invalid, paused, or archived.';
63+
export const EVENT_STORE_FULL = 'Event store is full. Not saving event with id %d.';
6364

6465
export const messages: string[] = [];

0 commit comments

Comments
 (0)