@@ -2,60 +2,16 @@ use crate::{
22 core:: utils:: globals:: { AUTH_SERVERS , CDN_SERVERS } ,
33 log_info, log_warn,
44} ;
5- use backoff:: { future:: retry, ExponentialBackoff } ;
65use reqwest:: Client ;
7- use std:: sync:: atomic :: { AtomicUsize , Ordering } ;
8- use std:: sync :: { Arc , LazyLock , Mutex , RwLock } ;
9- use std :: time :: { Duration , Instant } ;
6+ use std:: sync:: { LazyLock , Mutex , RwLock } ;
7+ use std:: time :: Duration ;
8+ use tokio :: sync :: watch ;
109
11- const REQUEST_TIMEOUT : Duration = Duration :: from_secs ( 5 ) ;
12- const CB_MAX_FAILURES : usize = 3 ;
13- const CB_RESET_WINDOW : Duration = Duration :: from_secs ( 60 ) ;
14- const BACKOFF_MAX_ELAPSED : Duration = Duration :: from_secs ( 10 ) ;
15-
16- #[ derive( Debug ) ]
17- pub struct CircuitBreaker {
18- failures : AtomicUsize ,
19- last_failure : Mutex < Option < Instant > > ,
20- }
21-
22- impl CircuitBreaker {
23- fn new ( ) -> Self {
24- Self {
25- failures : AtomicUsize :: new ( 0 ) ,
26- last_failure : Mutex :: new ( None ) ,
27- }
28- }
29-
30- fn record_failure ( & self ) {
31- self . failures . fetch_add ( 1 , Ordering :: SeqCst ) ;
32- let mut last = self . last_failure . lock ( ) . unwrap ( ) ;
33- * last = Some ( Instant :: now ( ) ) ;
34- }
35-
36- fn record_success ( & self ) {
37- self . failures . store ( 0 , Ordering :: SeqCst ) ;
38- }
39-
40- fn is_open ( & self ) -> bool {
41- if self . failures . load ( Ordering :: SeqCst ) < CB_MAX_FAILURES {
42- return false ;
43- }
44- let last = self . last_failure . lock ( ) . unwrap ( ) ;
45- if let Some ( time) = * last {
46- if time. elapsed ( ) < CB_RESET_WINDOW {
47- return true ;
48- }
49- }
50- false
51- }
52- }
10+ const REQUEST_TIMEOUT : Duration = Duration :: from_secs ( 10 ) ;
5311
5412#[ derive( Debug , Clone , serde:: Serialize ) ]
5513pub struct Server {
5614 pub url : String ,
57- #[ serde( skip) ]
58- pub circuit_breaker : Arc < CircuitBreaker > ,
5915}
6016
6117#[ derive( Debug , Clone , serde:: Serialize ) ]
@@ -71,13 +27,14 @@ pub struct Servers {
7127 pub selected_cdn : RwLock < Option < Server > > ,
7228 pub selected_auth : RwLock < Option < Server > > ,
7329 pub connectivity_status : Mutex < ServerConnectivityStatus > ,
30+ pub check_complete_tx : watch:: Sender < bool > ,
31+ pub check_complete_rx : watch:: Receiver < bool > ,
7432}
7533
7634impl Server {
7735 pub fn new ( url : & str ) -> Self {
7836 Self {
7937 url : url. to_string ( ) ,
80- circuit_breaker : Arc :: new ( CircuitBreaker :: new ( ) ) ,
8138 }
8239 }
8340}
@@ -99,6 +56,8 @@ impl Servers {
9956 None
10057 } ;
10158
59+ let ( tx, rx) = watch:: channel ( false ) ;
60+
10261 Self {
10362 cdns,
10463 auths,
@@ -108,6 +67,8 @@ impl Servers {
10867 cdn_online : false ,
10968 auth_online : false ,
11069 } ) ,
70+ check_complete_tx : tx,
71+ check_complete_rx : rx,
11172 }
11273 }
11374
@@ -123,6 +84,15 @@ impl Servers {
12384 . await ;
12485
12586 self . set_status ( ) ;
87+ let _ = self . check_complete_tx . send ( true ) ;
88+ }
89+
90+ pub async fn wait_for_initial_check ( & self ) {
91+ let mut rx = self . check_complete_rx . clone ( ) ;
92+ if * rx. borrow_and_update ( ) {
93+ return ;
94+ }
95+ let _ = rx. changed ( ) . await ;
12696 }
12797
12898 async fn check_group (
@@ -133,51 +103,33 @@ impl Servers {
133103 name : & str ,
134104 ) {
135105 for server in servers {
136- if server. circuit_breaker . is_open ( ) {
137- log_warn ! ( "Skipping {} Server {}" , name, server. url) ;
138- continue ;
139- }
140-
141- let op = || async {
142- let resp = client. head ( & server. url ) . send ( ) . await . map_err ( |e| {
143- backoff:: Error :: < Box < dyn std:: error:: Error + Send + Sync > > :: transient ( Box :: new (
144- e,
145- ) )
146- } ) ?;
147- if !resp. status ( ) . is_success ( ) {
148- return Err (
149- backoff:: Error :: < Box < dyn std:: error:: Error + Send + Sync > > :: transient (
150- format ! ( "Status not success: {}" , resp. status( ) ) . into ( ) ,
151- ) ,
152- ) ;
153- }
154- Ok ( resp)
155- } ;
156-
157- let backoff = ExponentialBackoff {
158- max_elapsed_time : Some ( BACKOFF_MAX_ELAPSED ) ,
159- ..Default :: default ( )
160- } ;
161-
162- match retry ( backoff, op) . await {
163- Ok ( response) => {
164- log_info ! (
165- "{} Server {} responded with: {}" ,
166- name,
167- server. url,
168- response. status( )
169- ) ;
170- server. circuit_breaker . record_success ( ) ;
171- let mut lock = selected. write ( ) . unwrap ( ) ;
172- * lock = Some ( server. clone ( ) ) ;
173- return ;
106+ match client. head ( & server. url ) . send ( ) . await {
107+ Ok ( resp) => {
108+ if resp. status ( ) . is_success ( ) {
109+ log_info ! (
110+ "{} Server {} responded with: {}" ,
111+ name,
112+ server. url,
113+ resp. status( )
114+ ) ;
115+ let mut lock = selected. write ( ) . unwrap ( ) ;
116+ * lock = Some ( server. clone ( ) ) ;
117+ return ;
118+ } else {
119+ log_warn ! (
120+ "{} Server {} returned status: {}" ,
121+ name,
122+ server. url,
123+ resp. status( )
124+ ) ;
125+ }
174126 }
175127 Err ( e) => {
176128 log_warn ! ( "Failed to connect to {} Server {}: {}" , name, server. url, e) ;
177- server. circuit_breaker . record_failure ( ) ;
178129 }
179130 }
180131 }
132+
181133 let mut lock = selected. write ( ) . unwrap ( ) ;
182134 * lock = None ;
183135 }
0 commit comments