Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit 7fb80c2

Browse files
authored
Introduce initial-idle-shutdown-timeout-s (#528)
* hrana: Kick IdleKicker only after actual request is received Previously we Hrana was kicking IdleKicker at startup but that's no longer acceptable because we want to be able to have different idle shutdown timeouts for startup that does not receive any requests and startup that receives some requests. Signed-off-by: Piotr Jastrzebski <[email protected]> * Restructure IdleShutdownLayer:new This change only restructures the IdleShutdownLayer::new. It keeps the previous behaviour. New structure of the code will allow introduction of the actual changes in the next commit in much less invasive way. Signed-off-by: Piotr Jastrzebski <[email protected]> * Add initial-idle-shutdown-timeout-s This new parameter makes it possible to define different idle shutdown timeout for instance that has been started but never received any requests. It is useful for maintainance operations like periodic syncing replica with the primary even when the replica does not serve any requests at the moment. The assumption is that such maintainance operation could wake up the replica for much shorter time than idle-shutdown-timeout used after receiving actual user request. Signed-off-by: Piotr Jastrzebski <[email protected]> --------- Signed-off-by: Piotr Jastrzebski <[email protected]>
1 parent 8b75dc8 commit 7fb80c2

File tree

5 files changed

+45
-28
lines changed

5 files changed

+45
-28
lines changed

sqld/src/hrana/ws/conn.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,6 @@ async fn handle_ws<D: Database>(
8282
};
8383

8484
loop {
85-
if let Some(kicker) = conn.server.idle_kicker.as_ref() {
86-
kicker.kick();
87-
}
88-
8985
tokio::select! {
9086
Some(client_msg_res) = conn.ws.recv() => {
9187
let client_msg = client_msg_res
@@ -122,6 +118,10 @@ async fn handle_ws<D: Database>(
122118
},
123119
else => break,
124120
}
121+
122+
if let Some(kicker) = conn.server.idle_kicker.as_ref() {
123+
kicker.kick();
124+
}
125125
}
126126

127127
close(

sqld/src/hrana/ws/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,6 @@ pub async fn serve(
5050

5151
let mut join_set = tokio::task::JoinSet::new();
5252
loop {
53-
if let Some(kicker) = server.idle_kicker.as_ref() {
54-
kicker.kick();
55-
}
56-
5753
tokio::select! {
5854
Some(accept) = accept_rx.recv() => {
5955
let conn_id = server.next_conn_id.fetch_add(1, Ordering::AcqRel);
@@ -85,6 +81,10 @@ pub async fn serve(
8581
return Ok(())
8682
}
8783
}
84+
85+
if let Some(kicker) = server.idle_kicker.as_ref() {
86+
kicker.kick();
87+
}
8888
}
8989
}
9090

sqld/src/lib.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ pub struct Config {
9292
pub rpc_server_ca_cert: Option<PathBuf>,
9393
pub bottomless_replication: Option<bottomless::replicator::Options>,
9494
pub idle_shutdown_timeout: Option<Duration>,
95+
pub initial_idle_shutdown_timeout: Option<Duration>,
9596
pub load_from_dump: Option<PathBuf>,
9697
pub max_log_size: u64,
9798
pub max_log_duration: Option<f32>,
@@ -130,6 +131,7 @@ impl Default for Config {
130131
rpc_server_ca_cert: None,
131132
bottomless_replication: None,
132133
idle_shutdown_timeout: None,
134+
initial_idle_shutdown_timeout: None,
133135
load_from_dump: None,
134136
max_log_size: 200,
135137
max_log_duration: None,
@@ -655,9 +657,13 @@ pub async fn run_server(config: Config) -> anyhow::Result<()> {
655657
Ok(())
656658
});
657659

658-
let idle_shutdown_layer = config
659-
.idle_shutdown_timeout
660-
.map(|d| IdleShutdownLayer::new(d, shutdown_sender.clone()));
660+
let idle_shutdown_layer = config.idle_shutdown_timeout.map(|d| {
661+
IdleShutdownLayer::new(
662+
d,
663+
config.initial_idle_shutdown_timeout,
664+
shutdown_sender.clone(),
665+
)
666+
});
661667

662668
let stats = Stats::new(&config.db_path)?;
663669

sqld/src/main.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,12 @@ struct Cli {
122122
#[clap(long, env = "SQLD_IDLE_SHUTDOWN_TIMEOUT_S")]
123123
idle_shutdown_timeout_s: Option<u64>,
124124

125+
/// Like idle_shutdown_timeout_s but used only once after the server is started.
126+
/// After that server either is shut down because it does not receive any requests
127+
/// or idle_shutdown_timeout_s is used moving forward.
128+
#[clap(long, env = "SQLD_INITIAL_IDLE_SHUTDOWN_TIMEOUT_S")]
129+
initial_idle_shutdown_timeout_s: Option<u64>,
130+
125131
/// Load the dump at the provided path.
126132
/// Requires that the node is not in replica mode
127133
#[clap(long, env = "SQLD_LOAD_DUMP_PATH", conflicts_with = "primary_grpc_url")]
@@ -272,6 +278,9 @@ fn config_from_args(args: Cli) -> Result<Config> {
272278
None
273279
},
274280
idle_shutdown_timeout: args.idle_shutdown_timeout_s.map(Duration::from_secs),
281+
initial_idle_shutdown_timeout: args
282+
.initial_idle_shutdown_timeout_s
283+
.map(Duration::from_secs),
275284
load_from_dump: args.load_from_dump,
276285
max_log_size: args.max_log_size,
277286
max_log_duration: args.max_log_duration,

sqld/src/utils/services/idle_shutdown.rs

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,31 +14,33 @@ pub struct IdleShutdownLayer {
1414
}
1515

1616
impl IdleShutdownLayer {
17-
pub fn new(idle_timeout: Duration, shutdown_notifier: mpsc::Sender<()>) -> Self {
17+
pub fn new(
18+
idle_timeout: Duration,
19+
initial_idle_timeout: Option<Duration>,
20+
shutdown_notifier: mpsc::Sender<()>,
21+
) -> Self {
1822
let (sender, mut receiver) = watch::channel(());
1923
let connected_replicas = Arc::new(AtomicUsize::new(0));
2024
let connected_replicas_clone = connected_replicas.clone();
25+
let mut sleep_time = initial_idle_timeout.unwrap_or(idle_timeout);
2126
tokio::spawn(async move {
2227
loop {
2328
// FIXME: if we measure that this is causing performance issues, we may want to
2429
// implement some debouncing.
25-
let timeout_fut = timeout(idle_timeout, receiver.changed());
26-
match timeout_fut.await {
27-
Ok(Ok(_)) => continue,
28-
Ok(Err(_)) => break,
29-
Err(_) => {
30-
if connected_replicas_clone.load(Ordering::SeqCst) > 0 {
31-
continue;
32-
}
33-
tracing::info!(
34-
"Idle timeout, no new connection in {idle_timeout:.0?}. Shutting down.",
35-
);
36-
shutdown_notifier
37-
.send(())
38-
.await
39-
.expect("failed to shutdown gracefully");
40-
}
30+
let timeout_res = timeout(sleep_time, receiver.changed()).await;
31+
if let Ok(Err(_)) = timeout_res {
32+
break;
4133
}
34+
if timeout_res.is_err() && connected_replicas_clone.load(Ordering::SeqCst) == 0 {
35+
tracing::info!(
36+
"Idle timeout, no new connection in {sleep_time:.0?}. Shutting down.",
37+
);
38+
shutdown_notifier
39+
.send(())
40+
.await
41+
.expect("failed to shutdown gracefully");
42+
}
43+
sleep_time = idle_timeout;
4244
}
4345

4446
tracing::debug!("idle shutdown loop exited");

0 commit comments

Comments
 (0)