Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit be4e5e9

Browse files
authored
Make living replica keep primary alive (#517)
Each request for new frames from a replica to a primary resets the idle timer but such request is a long poll. It won't return until it gets some new frame. This means that a replica won't send another request for new frames in the mean-time and won't restart the idle timer. This commit makes primary keep track of replicas waiting for new frames and prevents IdleShutdownLayer from stopping the primary if there is at least one replica still waiting for the new frames. When a replica is put into sleep, it will drop the connection and the connected_replicas counter will be decreased so only live replicas can prevent primary from going to sleep. Fixes #506 Signed-off-by: Piotr Jastrzebski <[email protected]>
1 parent b756dec commit be4e5e9

File tree

3 files changed

+66
-5
lines changed

3 files changed

+66
-5
lines changed

sqld/src/rpc/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ pub async fn run_rpc_server<D: Database>(
2828
idle_shutdown_layer: Option<IdleShutdownLayer>,
2929
) -> anyhow::Result<()> {
3030
let proxy_service = ProxyService::new(factory, logger.new_frame_notifier.subscribe());
31-
let logger_service = ReplicationLogService::new(logger);
31+
let logger_service = ReplicationLogService::new(logger, idle_shutdown_layer.clone());
3232

3333
tracing::info!("serving write proxy server at {addr}");
3434

sqld/src/rpc/replication_log.rs

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,29 @@ use tonic::Status;
1515

1616
use crate::replication::primary::frame_stream::FrameStream;
1717
use crate::replication::{LogReadError, ReplicationLogger};
18+
use crate::utils::services::idle_shutdown::IdleShutdownLayer;
1819

1920
use self::rpc::replication_log_server::ReplicationLog;
2021
use self::rpc::{Frame, HelloRequest, HelloResponse, LogOffset};
2122

2223
pub struct ReplicationLogService {
2324
logger: Arc<ReplicationLogger>,
2425
replicas_with_hello: RwLock<HashSet<SocketAddr>>,
26+
idle_shutdown_layer: Option<IdleShutdownLayer>,
2527
}
2628

2729
pub const NO_HELLO_ERROR_MSG: &str = "NO_HELLO";
2830
pub const NEED_SNAPSHOT_ERROR_MSG: &str = "NEED_SNAPSHOT";
2931

3032
impl ReplicationLogService {
31-
pub fn new(logger: Arc<ReplicationLogger>) -> Self {
33+
pub fn new(
34+
logger: Arc<ReplicationLogger>,
35+
idle_shutdown_layer: Option<IdleShutdownLayer>,
36+
) -> Self {
3237
Self {
3338
logger,
3439
replicas_with_hello: RwLock::new(HashSet::<SocketAddr>::new()),
40+
idle_shutdown_layer,
3541
}
3642
}
3743
}
@@ -56,6 +62,42 @@ fn map_frame_stream_output(
5662
}
5763
}
5864

65+
pub struct StreamGuard<S> {
66+
s: S,
67+
idle_shutdown_layer: Option<IdleShutdownLayer>,
68+
}
69+
70+
impl<S> StreamGuard<S> {
71+
fn new(s: S, mut idle_shutdown_layer: Option<IdleShutdownLayer>) -> Self {
72+
if let Some(isl) = idle_shutdown_layer.as_mut() {
73+
isl.add_connected_replica()
74+
}
75+
Self {
76+
s,
77+
idle_shutdown_layer,
78+
}
79+
}
80+
}
81+
82+
impl<S> Drop for StreamGuard<S> {
83+
fn drop(&mut self) {
84+
if let Some(isl) = self.idle_shutdown_layer.as_mut() {
85+
isl.remove_connected_replica()
86+
}
87+
}
88+
}
89+
90+
impl<S: futures::stream::Stream + Unpin> futures::stream::Stream for StreamGuard<S> {
91+
type Item = S::Item;
92+
93+
fn poll_next(
94+
self: std::pin::Pin<&mut Self>,
95+
cx: &mut std::task::Context<'_>,
96+
) -> std::task::Poll<Option<Self::Item>> {
97+
self.get_mut().s.poll_next_unpin(cx)
98+
}
99+
}
100+
59101
#[tonic::async_trait]
60102
impl ReplicationLog for ReplicationLogService {
61103
type LogEntriesStream = BoxStream<'static, Result<Frame, Status>>;
@@ -75,9 +117,12 @@ impl ReplicationLog for ReplicationLogService {
75117
}
76118
}
77119

78-
let stream = FrameStream::new(self.logger.clone(), req.into_inner().next_offset)
79-
.map(map_frame_stream_output)
80-
.boxed();
120+
let stream = StreamGuard::new(
121+
FrameStream::new(self.logger.clone(), req.into_inner().next_offset),
122+
self.idle_shutdown_layer.clone(),
123+
)
124+
.map(map_frame_stream_output)
125+
.boxed();
81126

82127
Ok(tonic::Response::new(stream))
83128
}

sqld/src/utils/services/idle_shutdown.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::sync::atomic::{AtomicUsize, Ordering};
12
use std::sync::Arc;
23
use std::time::Duration;
34

@@ -9,11 +10,14 @@ use tower::{Layer, Service};
910
#[derive(Clone)]
1011
pub struct IdleShutdownLayer {
1112
watcher: Arc<watch::Sender<()>>,
13+
connected_replicas: Arc<AtomicUsize>,
1214
}
1315

1416
impl IdleShutdownLayer {
1517
pub fn new(idle_timeout: Duration, shutdown_notifier: mpsc::Sender<()>) -> Self {
1618
let (sender, mut receiver) = watch::channel(());
19+
let connected_replicas = Arc::new(AtomicUsize::new(0));
20+
let connected_replicas_clone = connected_replicas.clone();
1721
tokio::spawn(async move {
1822
loop {
1923
// FIXME: if we measure that this is causing performance issues, we may want to
@@ -23,6 +27,9 @@ impl IdleShutdownLayer {
2327
Ok(Ok(_)) => continue,
2428
Ok(Err(_)) => break,
2529
Err(_) => {
30+
if connected_replicas_clone.load(Ordering::SeqCst) > 0 {
31+
continue;
32+
}
2633
tracing::info!(
2734
"Idle timeout, no new connection in {idle_timeout:.0?}. Shutting down.",
2835
);
@@ -39,9 +46,18 @@ impl IdleShutdownLayer {
3946

4047
Self {
4148
watcher: Arc::new(sender),
49+
connected_replicas,
4250
}
4351
}
4452

53+
pub fn add_connected_replica(&mut self) {
54+
self.connected_replicas.fetch_add(1, Ordering::SeqCst);
55+
}
56+
57+
pub fn remove_connected_replica(&mut self) {
58+
self.connected_replicas.fetch_sub(1, Ordering::SeqCst);
59+
}
60+
4561
pub fn into_kicker(self) -> IdleKicker {
4662
IdleKicker {
4763
sender: self.watcher,

0 commit comments

Comments
 (0)