@@ -5,16 +5,51 @@ import { resolveUrl } from '../url-utils';
55import { UnleashEvents } from '../events' ;
66import { EventSource } from '../event-source' ;
77import { FetcherInterface , StreamingFetchingOptions } from './fetcher' ;
8+ import { FailEvent , FailoverStrategy } from './streaming-fail-over' ;
89
910export class StreamingFetcher extends EventEmitter implements FetcherInterface {
1011 private eventSource : EventSource | undefined ;
1112
12- private options : StreamingFetchingOptions ;
13+ private readonly url : string ;
1314
14- constructor ( options : StreamingFetchingOptions ) {
15+ private readonly appName : string ;
16+
17+ private readonly instanceId : string ;
18+
19+ private readonly headers ?: Record < string , string > ;
20+
21+ private readonly connectionId ?: string ;
22+
23+ private readonly onSaveDelta : StreamingFetchingOptions [ 'onSaveDelta' ] ;
24+
25+ private readonly onModeChange ?: StreamingFetchingOptions [ 'onModeChange' ] ;
26+
27+ private readonly failoverStrategy : FailoverStrategy ;
28+
29+ constructor ( {
30+ url,
31+ appName,
32+ instanceId,
33+ headers,
34+ connectionId,
35+ eventSource,
36+ maxFailuresUntilFailover = 5 ,
37+ failureWindowMs = 60_000 ,
38+ onSaveDelta,
39+ onModeChange,
40+ } : StreamingFetchingOptions ) {
1541 super ( ) ;
16- this . options = options ;
17- this . eventSource = options . eventSource ;
42+
43+ this . url = url ;
44+ this . appName = appName ;
45+ this . instanceId = instanceId ;
46+ this . headers = headers ;
47+ this . connectionId = connectionId ;
48+ this . onSaveDelta = onSaveDelta ;
49+ this . onModeChange = onModeChange ;
50+
51+ this . eventSource = eventSource ;
52+ this . failoverStrategy = new FailoverStrategy ( maxFailuresUntilFailover , failureWindowMs ) ;
1853 }
1954
2055 private setupEventSource ( ) {
@@ -23,46 +58,89 @@ export class StreamingFetcher extends EventEmitter implements FetcherInterface {
2358 await this . handleFlagsFromStream ( event ) ;
2459 } ) ;
2560 this . eventSource . addEventListener ( 'unleash-updated' , this . handleFlagsFromStream . bind ( this ) ) ;
26- this . eventSource . addEventListener ( 'error' , ( error : unknown ) => {
27- this . emit ( UnleashEvents . Warn , error ) ;
28- } ) ;
29- this . eventSource . addEventListener ( 'end' , ( error : unknown ) => {
30- this . emit ( UnleashEvents . Warn , error ) ;
31- } ) ;
61+ this . eventSource . addEventListener ( 'error' , this . handleErrorEvent . bind ( this ) ) ;
62+ this . eventSource . addEventListener ( 'end' , this . handleServerDisconnect . bind ( this ) ) ;
3263 this . eventSource . addEventListener ( 'fetch-mode' , this . handleModeChange . bind ( this ) ) ;
3364 }
3465 }
3566
67+ private async handleErrorEvent ( error : any ) : Promise < void > {
68+ const now = new Date ( ) ;
69+
70+ const failEvent : FailEvent =
71+ typeof error ?. status === 'number'
72+ ? {
73+ type : 'http-status-error' ,
74+ message : error . message ?? `Stream failed with http status code ${ error . status } ` ,
75+ statusCode : error . status ,
76+ occurredAt : now ,
77+ }
78+ : {
79+ type : 'network-error' ,
80+ message : error . message ?? 'Network error occurred in streaming' ,
81+ occurredAt : now ,
82+ } ;
83+
84+ await this . handleFailoverDecision ( failEvent ) ;
85+ }
86+
87+ private async handleServerDisconnect ( ) : Promise < void > {
88+ const failEvent : FailEvent = {
89+ type : 'network-error' ,
90+ message : 'Server closed the streaming connection' ,
91+ occurredAt : new Date ( ) ,
92+ } ;
93+
94+ await this . handleFailoverDecision ( failEvent ) ;
95+ }
96+
97+ private async handleFailoverDecision ( event : FailEvent ) : Promise < void > {
98+ const now = new Date ( ) ;
99+ const shouldFailover = this . failoverStrategy . shouldFailover ( event , now ) ;
100+
101+ if ( ! shouldFailover ) {
102+ return ;
103+ }
104+
105+ this . emit ( UnleashEvents . Warn , event . message ) ;
106+
107+ if ( this . onModeChange ) {
108+ await this . onModeChange ( 'polling' ) ;
109+ }
110+ }
111+
36112 private async handleFlagsFromStream ( event : { data : string } ) {
37113 try {
38114 const data = parseClientFeaturesDelta ( JSON . parse ( event . data ) ) ;
39- await this . options . onSaveDelta ( data ) ;
115+ await this . onSaveDelta ( data ) ;
40116 } catch ( err ) {
41117 this . emit ( UnleashEvents . Error , err ) ;
42118 }
43119 }
44120
45121 private async handleModeChange ( event : { data : string } ) {
46- try {
47- const newMode = event . data as 'polling' | 'streaming' ;
48- if ( this . options . onModeChange ) {
49- await this . options . onModeChange ( newMode ) ;
50- }
51- } catch ( err ) {
52- this . emit ( UnleashEvents . Error , new Error ( `Failed to handle mode change: ${ err } ` ) ) ;
122+ const newMode = event . data as 'polling' | 'streaming' ;
123+
124+ if ( newMode === 'polling' ) {
125+ await this . handleFailoverDecision ( {
126+ type : 'server-hint' ,
127+ event : `polling` ,
128+ message : 'Server has explicitly requested switching to polling mode' ,
129+ occurredAt : new Date ( ) ,
130+ } ) ;
53131 }
54132 }
55133
56134 private createEventSource ( ) : EventSource {
57- return new EventSource ( resolveUrl ( this . options . url , './client/streaming' ) , {
135+ return new EventSource ( resolveUrl ( this . url , './client/streaming' ) , {
58136 headers : buildHeaders ( {
59- appName : this . options . appName ,
60- instanceId : this . options . instanceId ,
137+ appName : this . appName ,
138+ instanceId : this . instanceId ,
61139 etag : undefined ,
62140 contentType : undefined ,
63- custom : this . options . headers ,
141+ custom : this . headers ,
64142 specVersionSupported : '5.2.0' ,
65- connectionId : this . options . connectionId ,
143+ connectionId : this . connectionId ,
66144 } ) ,
67145 readTimeoutMillis : 60000 ,
68146 initialRetryDelayMillis : 2000 ,
0 commit comments