Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit be325ec

Browse files
Merge #323
323: Replica connection wait for replication r=MarinPostma a=MarinPostma What this PR do is quite simple: it makes every replica database connection to wait for the replica instance to catch up with the last write that they performed. When a replica detects a write, this write is proxied to the primary, that executes it, and then starts replicating the state. The replica receives a response that contains a logical timestamp issued after the write was performed. The replica's connection keep this timestamp and on subsequent operation on the _local_ connection, waits until the replicator has applied changes up to the stored timestamp. Note that each connection holds a different timestamp, so a connection may proceed with a read, while another is still waiting. The basic guarantee that it gives is read your write: a connection is always guaranteed to witness it's write. In fact the guarantee is much stronger because of the underlying consistency of sqlite, the real-time consistency model of sqld is now as strong as `sequential consistency`. Co-authored-by: ad hoc <[email protected]>
2 parents f67e067 + 5ff2e2b commit be325ec

File tree

7 files changed

+86
-7
lines changed

7 files changed

+86
-7
lines changed

sqld/proto/proxy.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ message ExecuteResults {
8989
}
9090
/// State after executing the queries
9191
State state = 2;
92+
/// Primary frame_no after executing the request.
93+
uint64 current_frame_no = 3;
9294
}
9395

9496
message Program {

sqld/src/database/write_proxy.rs

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
use std::path::PathBuf;
22
use std::sync::Arc;
33

4-
use tokio::sync::Mutex;
4+
use parking_lot::Mutex as PMutex;
5+
use tokio::sync::{watch, Mutex};
56
use tonic::transport::Channel;
67
use uuid::Uuid;
78

89
use crate::auth::{Authenticated, Authorized};
910
use crate::error::Error;
1011
use crate::query::{QueryResponse, QueryResult};
1112
use crate::query_analysis::State;
13+
use crate::replication::FrameNo;
1214
use crate::rpc::proxy::rpc::proxy_client::ProxyClient;
1315
use crate::rpc::proxy::rpc::query_result::RowResult;
1416
use crate::rpc::proxy::rpc::DisconnectMessage;
@@ -23,6 +25,7 @@ pub struct WriteProxyDbFactory {
2325
client: ProxyClient<Channel>,
2426
db_path: PathBuf,
2527
stats: Stats,
28+
applied_frame_no_receiver: watch::Receiver<FrameNo>,
2629
}
2730

2831
impl WriteProxyDbFactory {
@@ -31,12 +34,14 @@ impl WriteProxyDbFactory {
3134
channel: Channel,
3235
uri: tonic::transport::Uri,
3336
stats: Stats,
37+
applied_frame_no_receiver: watch::Receiver<FrameNo>,
3438
) -> Self {
3539
let client = ProxyClient::with_origin(channel, uri);
3640
Self {
3741
client,
3842
db_path,
3943
stats,
44+
applied_frame_no_receiver,
4045
}
4146
}
4247
}
@@ -48,6 +53,7 @@ impl DbFactory for WriteProxyDbFactory {
4853
self.client.clone(),
4954
self.db_path.clone(),
5055
self.stats.clone(),
56+
self.applied_frame_no_receiver.clone(),
5157
)?;
5258
Ok(Arc::new(db))
5359
}
@@ -58,16 +64,29 @@ pub struct WriteProxyDatabase {
5864
write_proxy: ProxyClient<Channel>,
5965
state: Mutex<State>,
6066
client_id: Uuid,
67+
/// FrameNo of the last write performed by this connection on the primary.
68+
/// any subsequent read on this connection must wait for the replicator to catch up with this
69+
/// frame_no
70+
last_write_frame_no: PMutex<FrameNo>,
71+
/// Notifier from the repliator of the currently applied frameno
72+
applied_frame_no_receiver: watch::Receiver<FrameNo>,
6173
}
6274

6375
impl WriteProxyDatabase {
64-
fn new(write_proxy: ProxyClient<Channel>, path: PathBuf, stats: Stats) -> Result<Self> {
76+
fn new(
77+
write_proxy: ProxyClient<Channel>,
78+
path: PathBuf,
79+
stats: Stats,
80+
applied_frame_no_receiver: watch::Receiver<FrameNo>,
81+
) -> Result<Self> {
6582
let read_db = LibSqlDb::new(path, (), false, stats)?;
6683
Ok(Self {
6784
read_db,
6885
write_proxy,
6986
state: Mutex::new(State::Init),
7087
client_id: Uuid::new_v4(),
88+
last_write_frame_no: PMutex::new(FrameNo::MAX),
89+
applied_frame_no_receiver,
7190
})
7291
}
7392

@@ -104,6 +123,8 @@ impl WriteProxyDatabase {
104123
})
105124
.collect();
106125

126+
self.update_last_write_frame_no(execute_result.current_frame_no);
127+
107128
Ok((results, *state))
108129
}
109130
Err(e) => {
@@ -114,6 +135,35 @@ impl WriteProxyDatabase {
114135
}
115136
}
116137
}
138+
139+
fn update_last_write_frame_no(&self, new_frame_no: FrameNo) {
140+
let mut last_frame_no = self.last_write_frame_no.lock();
141+
if *last_frame_no == FrameNo::MAX || new_frame_no > *last_frame_no {
142+
*last_frame_no = new_frame_no
143+
}
144+
}
145+
146+
/// wait for the replicator to have caught up with our current write frame_no
147+
async fn wait_replication_sync(&self) -> Result<()> {
148+
let current_frame_no = *self.last_write_frame_no.lock();
149+
150+
if current_frame_no == FrameNo::MAX {
151+
return Ok(());
152+
}
153+
154+
let mut receiver = self.applied_frame_no_receiver.clone();
155+
let mut last_applied = *receiver.borrow_and_update();
156+
157+
while last_applied < current_frame_no {
158+
receiver
159+
.changed()
160+
.await
161+
.map_err(|_| Error::ReplicatorExited)?;
162+
last_applied = *receiver.borrow_and_update();
163+
}
164+
165+
Ok(())
166+
}
117167
}
118168

119169
#[async_trait::async_trait]
@@ -125,6 +175,7 @@ impl Database for WriteProxyDatabase {
125175
) -> Result<(Vec<Option<QueryResult>>, State)> {
126176
let mut state = self.state.lock().await;
127177
if *state == State::Init && pgm.is_read_only() {
178+
self.wait_replication_sync().await?;
128179
// We know that this program won't perform any writes. We attempt to run it on the
129180
// replica. If it leaves an open transaction, then this program is an interactive
130181
// transaction, so we rollback the replica, and execute again on the primary.

sqld/src/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ pub enum Error {
2525
InvalidBatchStep(usize),
2626
#[error("Not authorized to execute query: {0}")]
2727
NotAuthorized(String),
28+
#[error("The replicator exited, instance cannot make any progress.")]
29+
ReplicatorExited,
2830
}
2931

3032
impl From<tokio::sync::oneshot::error::RecvError> for Error {

sqld/src/lib.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,10 +201,17 @@ async fn start_replica(
201201
) -> anyhow::Result<()> {
202202
let (channel, uri) = configure_rpc(config)?;
203203
let replicator = Replicator::new(config.db_path.clone(), channel.clone(), uri.clone());
204+
let applied_frame_no_receiver = replicator.current_frame_no_notifier.subscribe();
204205

205206
join_set.spawn(replicator.run());
206207

207-
let factory = WriteProxyDbFactory::new(config.db_path.clone(), channel, uri, stats.clone());
208+
let factory = WriteProxyDbFactory::new(
209+
config.db_path.clone(),
210+
channel,
211+
uri,
212+
stats.clone(),
213+
applied_frame_no_receiver,
214+
);
208215
run_service(
209216
Arc::new(factory),
210217
config,

sqld/src/replication/replica/replicator.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::time::Duration;
33

44
use anyhow::bail;
55
use futures::StreamExt;
6+
use tokio::sync::watch;
67
use tonic::transport::Channel;
78

89
use crate::replication::frame::Frame;
@@ -27,16 +28,19 @@ pub struct Replicator {
2728
db_path: PathBuf,
2829
injector: Option<FrameInjectorHandle>,
2930
current_frame_no: FrameNo,
31+
pub current_frame_no_notifier: watch::Sender<FrameNo>,
3032
}
3133

3234
impl Replicator {
3335
pub fn new(db_path: PathBuf, channel: Channel, uri: tonic::transport::Uri) -> Self {
3436
let client = Client::with_origin(channel, uri);
37+
let (applied_frame_notifier, _) = watch::channel(FrameNo::MAX);
3538
Self {
3639
client,
3740
db_path,
3841
injector: None,
3942
current_frame_no: FrameNo::MAX,
43+
current_frame_no_notifier: applied_frame_notifier,
4044
}
4145
}
4246

@@ -72,7 +76,7 @@ impl Replicator {
7276
}
7377
let (injector, last_applied_frame_no) =
7478
FrameInjectorHandle::new(self.db_path.clone(), hello).await?;
75-
self.current_frame_no = last_applied_frame_no;
79+
self.update_current_frame_no(last_applied_frame_no);
7680
self.injector.replace(injector);
7781
return Ok(());
7882
}
@@ -88,6 +92,11 @@ impl Replicator {
8892
bail!("couldn't connect to primary after {HANDSHAKE_MAX_RETRIES} tries.");
8993
}
9094

95+
fn update_current_frame_no(&mut self, new_frame_no: FrameNo) {
96+
self.current_frame_no = new_frame_no;
97+
self.current_frame_no_notifier.send_replace(new_frame_no);
98+
}
99+
91100
async fn replicate(&mut self) -> anyhow::Result<()> {
92101
let offset = LogOffset {
93102
// if current == FrameNo::Max then it means that we're starting fresh
@@ -142,13 +151,15 @@ impl Replicator {
142151
}
143152

144153
async fn flush_txn(&mut self, frames: Vec<Frame>) -> anyhow::Result<()> {
145-
self.current_frame_no = self
154+
let new_frame_no = self
146155
.injector
147156
.as_mut()
148157
.expect("invalid state")
149158
.apply_frames(Frames::Vec(frames))
150159
.await?;
151160

161+
self.update_current_frame_no(new_frame_no);
162+
152163
Ok(())
153164
}
154165

sqld/src/rpc/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ pub async fn run_rpc_server(
2626
logger: Arc<ReplicationLogger>,
2727
idle_shutdown_layer: Option<IdleShutdownLayer>,
2828
) -> anyhow::Result<()> {
29-
let proxy_service = ProxyService::new(factory);
29+
let proxy_service = ProxyService::new(factory, logger.new_frame_notifier.subscribe());
3030
let logger_service = ReplicationLogService::new(logger);
3131

3232
tracing::info!("serving write proxy server at {addr}");

sqld/src/rpc/proxy.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ use std::str::FromStr;
33
use std::sync::Arc;
44

55
use async_lock::{RwLock, RwLockUpgradableReadGuard};
6+
use tokio::sync::watch;
67
use uuid::Uuid;
78

89
use crate::auth::{Authenticated, Authorized};
910
use crate::database::factory::DbFactory;
1011
use crate::database::{Database, Program};
12+
use crate::replication::FrameNo;
1113

1214
use self::rpc::execute_results::State;
1315
use self::rpc::proxy_server::Proxy;
@@ -272,13 +274,15 @@ pub mod rpc {
272274
pub struct ProxyService {
273275
clients: RwLock<HashMap<Uuid, Arc<dyn Database>>>,
274276
factory: Arc<dyn DbFactory>,
277+
new_frame_notifier: watch::Receiver<FrameNo>,
275278
}
276279

277280
impl ProxyService {
278-
pub fn new(factory: Arc<dyn DbFactory>) -> Self {
281+
pub fn new(factory: Arc<dyn DbFactory>, new_frame_notifier: watch::Receiver<FrameNo>) -> Self {
279282
Self {
280283
clients: Default::default(),
281284
factory,
285+
new_frame_notifier,
282286
}
283287
}
284288
}
@@ -323,10 +327,12 @@ impl Proxy for ProxyService {
323327
.await
324328
.map_err(|e| tonic::Status::new(tonic::Code::PermissionDenied, e.to_string()))?;
325329
let results = results.into_iter().map(|r| r.into()).collect();
330+
let current_frame_no = *self.new_frame_notifier.borrow();
326331

327332
Ok(tonic::Response::new(ExecuteResults {
328333
results,
329334
state: State::from(state).into(),
335+
current_frame_no,
330336
}))
331337
}
332338

0 commit comments

Comments
 (0)