Skip to content

Commit 04b3c11

Browse files
authored
chore: unwired up failover strategy (#780)
* chore: adds a failover strategy component that we'll use to deal with failures in streaming * chore: move streaming code into repo folder (#781) * chore: move repository tests into repository folder * feat: bolt in failover strategy (#782) * chore: expose config for failover (#783)
1 parent 37ff3c8 commit 04b3c11

File tree

6 files changed

+431
-64
lines changed

6 files changed

+431
-64
lines changed

src/repository/fetcher.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,6 @@ export interface PollingFetchingOptions extends CommonFetchingOptions {
3434

3535
export interface StreamingFetchingOptions extends CommonFetchingOptions {
3636
eventSource?: EventSource;
37+
maxFailuresUntilFailover?: number;
38+
failureWindowMs?: number;
3739
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
export type FailEvent = NetworkEventError | HttpStatusError | ServerEvent;
2+
3+
type BaseFailEvent = { occurredAt: Date; message: string };
4+
5+
type NetworkEventError = BaseFailEvent & {
6+
type: 'network-error';
7+
};
8+
9+
type HttpStatusError = BaseFailEvent & {
10+
statusCode: number;
11+
type: 'http-status-error';
12+
};
13+
14+
type ServerEvent = BaseFailEvent & {
15+
event: string;
16+
type: 'server-hint';
17+
};
18+
19+
const FAILOVER_SERVER_HINTS = ['polling'];
20+
21+
// explicitly including 429 here, this is used by Unleash to signal to the SDK
22+
// that there's already too many streaming clients connected and it should switch to polling
23+
const HARD_FAILOVER_STATUS_CODES = [401, 403, 404, 429, 501];
24+
const SOFT_FAILOVER_STATUS_CODES = [408, 500, 502, 503, 504];
25+
26+
export class FailoverStrategy {
27+
private failures: FailEvent[] = [];
28+
29+
constructor(
30+
private readonly maxFails: number,
31+
private readonly relaxTimeMs: number,
32+
) {}
33+
34+
shouldFailover(event: FailEvent, now: Date = new Date()): boolean {
35+
const nowMs = now.getTime();
36+
37+
this.pruneOldFailures(nowMs);
38+
switch (event.type) {
39+
case 'http-status-error':
40+
return this.handleHttpStatus(event);
41+
42+
case 'server-hint':
43+
return this.handleServerEvent(event);
44+
45+
case 'network-error':
46+
return this.handleNetwork(event);
47+
}
48+
}
49+
50+
private handleServerEvent(event: ServerEvent): boolean {
51+
if (FAILOVER_SERVER_HINTS.includes(event.event)) {
52+
return true;
53+
}
54+
55+
// things like hard disconnects are triggered by rolling restarts or explicit
56+
// shutdown. We expect Unleash to come back after such events so we ignore the
57+
// error here. If Unleash doesn't come back up, it'll be handled by the HTTP Status
58+
// events at some point in the near future
59+
return false;
60+
}
61+
62+
// Network shenanigans are basically always going to contribute to failover but
63+
// never an immediate failover decision. Kinda impossible to know if things will
64+
// get better sooo.. sliding window time!
65+
private handleNetwork(event: NetworkEventError): boolean {
66+
return this.hasTooManyFails(event);
67+
}
68+
69+
private handleHttpStatus(event: HttpStatusError): boolean {
70+
if (HARD_FAILOVER_STATUS_CODES.includes(event.statusCode)) {
71+
return true;
72+
} else if (SOFT_FAILOVER_STATUS_CODES.includes(event.statusCode)) {
73+
return this.hasTooManyFails(event);
74+
}
75+
return false;
76+
}
77+
78+
private hasTooManyFails(event: FailEvent): boolean {
79+
this.failures.push(event);
80+
return this.failures.length >= this.maxFails;
81+
}
82+
83+
// Because SSE doesn't have a success event, we only prune on new failures.
84+
// So we use this to build ourselves a sliding window of recent failures.
85+
// Be cool if we didn't have to do this but I see no meaningful way
86+
// to get ourselves an error window otherwise.
87+
private pruneOldFailures(nowMs: number): void {
88+
const cutoff = nowMs - this.relaxTimeMs;
89+
let write = 0;
90+
for (let read = 0; read < this.failures.length; read++) {
91+
if (this.failures[read].occurredAt.getTime() >= cutoff) {
92+
this.failures[write++] = this.failures[read];
93+
}
94+
}
95+
this.failures.length = write;
96+
}
97+
}

src/repository/streaming-fetcher.ts

Lines changed: 101 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,51 @@ import { resolveUrl } from '../url-utils';
55
import { UnleashEvents } from '../events';
66
import { EventSource } from '../event-source';
77
import { FetcherInterface, StreamingFetchingOptions } from './fetcher';
8+
import { FailEvent, FailoverStrategy } from './streaming-fail-over';
89

910
export class StreamingFetcher extends EventEmitter implements FetcherInterface {
1011
private eventSource: EventSource | undefined;
1112

12-
private options: StreamingFetchingOptions;
13+
private readonly url: string;
1314

14-
constructor(options: StreamingFetchingOptions) {
15+
private readonly appName: string;
16+
17+
private readonly instanceId: string;
18+
19+
private readonly headers?: Record<string, string>;
20+
21+
private readonly connectionId?: string;
22+
23+
private readonly onSaveDelta: StreamingFetchingOptions['onSaveDelta'];
24+
25+
private readonly onModeChange?: StreamingFetchingOptions['onModeChange'];
26+
27+
private readonly failoverStrategy: FailoverStrategy;
28+
29+
constructor({
30+
url,
31+
appName,
32+
instanceId,
33+
headers,
34+
connectionId,
35+
eventSource,
36+
maxFailuresUntilFailover = 5,
37+
failureWindowMs = 60_000,
38+
onSaveDelta,
39+
onModeChange,
40+
}: StreamingFetchingOptions) {
1541
super();
16-
this.options = options;
17-
this.eventSource = options.eventSource;
42+
43+
this.url = url;
44+
this.appName = appName;
45+
this.instanceId = instanceId;
46+
this.headers = headers;
47+
this.connectionId = connectionId;
48+
this.onSaveDelta = onSaveDelta;
49+
this.onModeChange = onModeChange;
50+
51+
this.eventSource = eventSource;
52+
this.failoverStrategy = new FailoverStrategy(maxFailuresUntilFailover, failureWindowMs);
1853
}
1954

2055
private setupEventSource() {
@@ -23,46 +58,89 @@ export class StreamingFetcher extends EventEmitter implements FetcherInterface {
2358
await this.handleFlagsFromStream(event);
2459
});
2560
this.eventSource.addEventListener('unleash-updated', this.handleFlagsFromStream.bind(this));
26-
this.eventSource.addEventListener('error', (error: unknown) => {
27-
this.emit(UnleashEvents.Warn, error);
28-
});
29-
this.eventSource.addEventListener('end', (error: unknown) => {
30-
this.emit(UnleashEvents.Warn, error);
31-
});
61+
this.eventSource.addEventListener('error', this.handleErrorEvent.bind(this));
62+
this.eventSource.addEventListener('end', this.handleServerDisconnect.bind(this));
3263
this.eventSource.addEventListener('fetch-mode', this.handleModeChange.bind(this));
3364
}
3465
}
3566

67+
private async handleErrorEvent(error: any): Promise<void> {
68+
const now = new Date();
69+
70+
const failEvent: FailEvent =
71+
typeof error?.status === 'number'
72+
? {
73+
type: 'http-status-error',
74+
message: error.message ?? `Stream failed with http status code ${error.status}`,
75+
statusCode: error.status,
76+
occurredAt: now,
77+
}
78+
: {
79+
type: 'network-error',
80+
message: error.message ?? 'Network error occurred in streaming',
81+
occurredAt: now,
82+
};
83+
84+
await this.handleFailoverDecision(failEvent);
85+
}
86+
87+
private async handleServerDisconnect(): Promise<void> {
88+
const failEvent: FailEvent = {
89+
type: 'network-error',
90+
message: 'Server closed the streaming connection',
91+
occurredAt: new Date(),
92+
};
93+
94+
await this.handleFailoverDecision(failEvent);
95+
}
96+
97+
private async handleFailoverDecision(event: FailEvent): Promise<void> {
98+
const now = new Date();
99+
const shouldFailover = this.failoverStrategy.shouldFailover(event, now);
100+
101+
if (!shouldFailover) {
102+
return;
103+
}
104+
105+
this.emit(UnleashEvents.Warn, event.message);
106+
107+
if (this.onModeChange) {
108+
await this.onModeChange('polling');
109+
}
110+
}
111+
36112
private async handleFlagsFromStream(event: { data: string }) {
37113
try {
38114
const data = parseClientFeaturesDelta(JSON.parse(event.data));
39-
await this.options.onSaveDelta(data);
115+
await this.onSaveDelta(data);
40116
} catch (err) {
41117
this.emit(UnleashEvents.Error, err);
42118
}
43119
}
44120

45121
private async handleModeChange(event: { data: string }) {
46-
try {
47-
const newMode = event.data as 'polling' | 'streaming';
48-
if (this.options.onModeChange) {
49-
await this.options.onModeChange(newMode);
50-
}
51-
} catch (err) {
52-
this.emit(UnleashEvents.Error, new Error(`Failed to handle mode change: ${err}`));
122+
const newMode = event.data as 'polling' | 'streaming';
123+
124+
if (newMode === 'polling') {
125+
await this.handleFailoverDecision({
126+
type: 'server-hint',
127+
event: `polling`,
128+
message: 'Server has explicitly requested switching to polling mode',
129+
occurredAt: new Date(),
130+
});
53131
}
54132
}
55133

56134
private createEventSource(): EventSource {
57-
return new EventSource(resolveUrl(this.options.url, './client/streaming'), {
135+
return new EventSource(resolveUrl(this.url, './client/streaming'), {
58136
headers: buildHeaders({
59-
appName: this.options.appName,
60-
instanceId: this.options.instanceId,
137+
appName: this.appName,
138+
instanceId: this.instanceId,
61139
etag: undefined,
62140
contentType: undefined,
63-
custom: this.options.headers,
141+
custom: this.headers,
64142
specVersionSupported: '5.2.0',
65-
connectionId: this.options.connectionId,
143+
connectionId: this.connectionId,
66144
}),
67145
readTimeoutMillis: 60000,
68146
initialRetryDelayMillis: 2000,

src/test/repository.test.ts renamed to src/test/repository/repository.test.ts

Lines changed: 7 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@ import { writeFileSync } from 'fs';
44
import { tmpdir } from 'os';
55
import { join } from 'path';
66

7-
import InMemStorageProvider from '../repository/storage-provider-in-mem';
8-
import FileStorageProvider from '../repository/storage-provider-file';
9-
import Repository from '../repository';
10-
import { DefaultBootstrapProvider } from '../repository/bootstrap-provider';
11-
import { StorageProvider } from '../repository/storage-provider';
12-
import { ClientFeaturesResponse, DeltaEvent } from '../feature';
7+
import InMemStorageProvider from '../../repository/storage-provider-in-mem';
8+
import FileStorageProvider from '../../repository/storage-provider-file';
9+
import Repository from '../../repository';
10+
import { DefaultBootstrapProvider } from '../../repository/bootstrap-provider';
11+
import { StorageProvider } from '../../repository/storage-provider';
12+
import { ClientFeaturesResponse, DeltaEvent } from '../../feature';
1313
import { EventEmitter } from 'events';
1414

1515
const appName = 'foo';
@@ -1443,7 +1443,7 @@ test('Stopping repository should stop storage provider updates', async (t) => {
14431443
});
14441444

14451445
test('Streaming deltas', async (t) => {
1446-
t.plan(8);
1446+
t.plan(5);
14471447
const url = 'http://unleash-test-streaming.app';
14481448
const feature = {
14491449
name: 'feature',
@@ -1556,40 +1556,6 @@ test('Streaming deltas', async (t) => {
15561556
repo.on('warn', (msg) => {
15571557
recordedWarnings.push(msg);
15581558
});
1559-
// SSE error translated to repo warning
1560-
eventSource.emit('error', 'some error');
1561-
1562-
// SSE end connection translated to repo warning
1563-
eventSource.emit('end', 'server ended connection');
1564-
t.deepEqual(recordedWarnings, ['some error', 'server ended connection']);
1565-
1566-
// re-connect simulation
1567-
eventSource.emit('unleash-connected', {
1568-
type: 'unleash-connected',
1569-
data: JSON.stringify({
1570-
events: [
1571-
{
1572-
type: 'hydration',
1573-
eventId: 6,
1574-
features: [{ ...feature, name: 'reconnectUpdate' }],
1575-
segments: [],
1576-
},
1577-
],
1578-
}),
1579-
});
1580-
const reconnectUpdate = repo.getToggles();
1581-
t.deepEqual(reconnectUpdate, [{ ...feature, name: 'reconnectUpdate' }]);
1582-
1583-
// Invalid data error translated to repo error
1584-
repo.on('error', (error) => {
1585-
t.true(error.message.startsWith(`Invalid delta response:`));
1586-
});
1587-
eventSource.emit('unleash-updated', {
1588-
type: 'unleash-updated',
1589-
data: JSON.stringify({
1590-
incorrectEvents: [],
1591-
}),
1592-
});
15931559
});
15941560

15951561
function setupPollingDeltaApi(url: string, events: DeltaEvent[]) {

0 commit comments

Comments
 (0)