Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit f8beda2

Browse files
committed
fix remote execution
1 parent 5fc49e1 commit f8beda2

File tree

8 files changed

+112
-70
lines changed

8 files changed

+112
-70
lines changed

sqld/src/database/libsql.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,8 +207,8 @@ impl Connection {
207207
fn run(&mut self, pgm: Program) -> Vec<Option<QueryResult>> {
208208
let mut results = Vec::with_capacity(pgm.steps.len());
209209

210-
for step in pgm.steps {
211-
let res = self.execute_step(&step, &results);
210+
for step in pgm.steps() {
211+
let res = self.execute_step(step, &results);
212212
results.push(res);
213213
}
214214

sqld/src/database/mod.rs

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::sync::Arc;
2+
13
use crate::query::{Params, Query, QueryResult};
24
use crate::query_analysis::{State, Statement};
35
use crate::Result;
@@ -9,24 +11,34 @@ pub mod write_proxy;
911

1012
const TXN_TIMEOUT_SECS: u64 = 5;
1113

12-
#[derive(Debug)]
14+
#[derive(Debug, Clone)]
1315
pub struct Program {
14-
pub steps: Vec<Step>,
16+
pub steps: Arc<Vec<Step>>,
1517
}
1618

1719
impl Program {
20+
pub fn new(steps: Vec<Step>) -> Self {
21+
Self {
22+
steps: Arc::new(steps),
23+
}
24+
}
25+
1826
pub fn is_read_only(&self) -> bool {
1927
self.steps.iter().all(|s| s.query.stmt.is_read_only())
2028
}
29+
30+
pub fn steps(&self) -> &[Step] {
31+
self.steps.as_slice()
32+
}
2133
}
2234

23-
#[derive(Debug)]
35+
#[derive(Debug, Clone)]
2436
pub struct Step {
2537
pub cond: Option<Cond>,
2638
pub query: Query,
2739
}
2840

29-
#[derive(Debug)]
41+
#[derive(Debug, Clone)]
3042
pub enum Cond {
3143
Ok { step: usize },
3244
Err { step: usize },
@@ -42,9 +54,7 @@ pub trait Database: Send + Sync {
4254

4355
/// Unconditionnaly execute a query as part of a program
4456
async fn execute_one(&self, query: Query) -> Result<(QueryResult, State)> {
45-
let pgm = Program {
46-
steps: vec![Step { cond: None, query }],
47-
};
57+
let pgm = Program::new(vec![Step { cond: None, query }]);
4858

4959
let (results, state) = self.execute_program(pgm).await?;
5060
Ok((results.into_iter().next().unwrap().unwrap(), state))
@@ -85,12 +95,25 @@ pub trait Database: Send + Sync {
8595
})
8696
}
8797

88-
let pgm = Program { steps };
98+
let pgm = Program::new(steps);
8999

90100
let (mut results, state) = self.execute_program(pgm).await?;
91101
// remove the rollback result
92102
results.pop();
93103

94104
Ok((results, state))
95105
}
106+
107+
async fn rollback(&self) -> Result<()> {
108+
let (results, _) = self
109+
.execute_one(Query {
110+
stmt: Statement::parse("ROLLBACK").next().unwrap().unwrap(),
111+
params: Params::empty(),
112+
})
113+
.await?;
114+
115+
results?;
116+
117+
Ok(())
118+
}
96119
}

sqld/src/database/write_proxy.rs

Lines changed: 50 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use uuid::Uuid;
1010

1111
use crate::error::Error;
1212
use crate::query::{QueryResponse, QueryResult};
13-
use crate::query_analysis::{final_state, State};
13+
use crate::query_analysis::State;
1414
use crate::replication::client::PeriodicDbUpdater;
1515
use crate::rpc::proxy::rpc::proxy_client::ProxyClient;
1616
use crate::rpc::proxy::rpc::query_result::RowResult;
@@ -115,50 +115,62 @@ impl WriteProxyDatabase {
115115
client_id: Uuid::new_v4(),
116116
})
117117
}
118+
119+
async fn execute_remote(
120+
&self,
121+
pgm: Program,
122+
state: &mut State,
123+
) -> Result<(Vec<Option<QueryResult>>, State)> {
124+
let mut client = self.write_proxy.clone();
125+
let req = crate::rpc::proxy::rpc::ProgramReq {
126+
client_id: self.client_id.to_string(),
127+
pgm: Some(pgm.into()),
128+
};
129+
match client.execute(req).await {
130+
Ok(r) => {
131+
let execute_result = r.into_inner();
132+
*state = execute_result.state().into();
133+
let results = execute_result
134+
.results
135+
.into_iter()
136+
.map(|r| -> Option<QueryResult> {
137+
let result = r.row_result?;
138+
match result {
139+
RowResult::Row(res) => Some(Ok(QueryResponse::ResultSet(res.into()))),
140+
RowResult::Error(e) => Some(Err(Error::RpcQueryError(e))),
141+
}
142+
})
143+
.collect();
144+
145+
Ok((results, *state))
146+
}
147+
Err(e) => {
148+
// Set state to invalid, so next call is sent to remote, and we have a chance
149+
// to recover state.
150+
*state = State::Invalid;
151+
Err(Error::RpcQueryExecutionError(e))
152+
}
153+
}
154+
}
118155
}
119156

120157
#[async_trait::async_trait]
121158
impl Database for WriteProxyDatabase {
122159
async fn execute_program(&self, pgm: Program) -> Result<(Vec<Option<QueryResult>>, State)> {
123160
let mut state = self.state.lock().await;
124-
if *state == State::Init
125-
&& pgm.is_read_only()
126-
&& final_state(*state, pgm.steps.iter().map(|s| &s.query.stmt)) == State::Init
127-
{
128-
self.read_db.execute_program(pgm).await
129-
} else {
130-
let mut client = self.write_proxy.clone();
131-
let req = crate::rpc::proxy::rpc::ProgramReq {
132-
client_id: self.client_id.to_string(),
133-
pgm: Some(pgm.into()),
134-
};
135-
match client.execute(req).await {
136-
Ok(r) => {
137-
let execute_result = r.into_inner();
138-
*state = execute_result.state().into();
139-
let results = execute_result
140-
.results
141-
.into_iter()
142-
.map(|r| -> Option<QueryResult> {
143-
let result = r.row_result?;
144-
match result {
145-
RowResult::Row(res) => {
146-
Some(Ok(QueryResponse::ResultSet(res.into())))
147-
}
148-
RowResult::Error(e) => Some(Err(Error::RpcQueryError(e))),
149-
}
150-
})
151-
.collect();
152-
153-
Ok((results, *state))
154-
}
155-
Err(e) => {
156-
// Set state to invalid, so next call is sent to remote, and we have a chance
157-
// to recover state.
158-
*state = State::Invalid;
159-
Err(Error::RpcQueryExecutionError(e))
160-
}
161+
if *state == State::Init && pgm.is_read_only() {
162+
// We know that this program won't perform any writes. We attempt to run it on the
163+
// replica. If it leaves an open transaction, then this program is an interactive
164+
// transaction, so we rollback the replica, and execute again on the primary.
165+
let (results, new_state) = self.read_db.execute_program(pgm.clone()).await?;
166+
if new_state != State::Init {
167+
self.read_db.rollback().await?;
168+
self.execute_remote(pgm, &mut state).await
169+
} else {
170+
Ok((results, new_state))
161171
}
172+
} else {
173+
self.execute_remote(pgm, &mut state).await
162174
}
163175
}
164176
}

sqld/src/hrana/batch.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ fn batch_to_program(batch: &proto::Batch) -> Result<Program> {
6868
steps.push(step);
6969
}
7070

71-
Ok(Program { steps })
71+
Ok(Program::new(steps))
7272
}
7373

7474
pub async fn execute_batch(db: &dyn Database, batch: &proto::Batch) -> Result<proto::BatchResult> {

sqld/src/http/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use crate::error::Error;
2828
use crate::hrana;
2929
use crate::http::types::HttpQuery;
3030
use crate::query::{self, Query, QueryResult, ResultSet};
31-
use crate::query_analysis::{final_state, State, Statement};
31+
use crate::query_analysis::{predict_final_state, State, Statement};
3232
use crate::utils::services::idle_shutdown::IdleShutdownLayer;
3333

3434
use self::types::QueryObject;
@@ -140,7 +140,7 @@ fn parse_queries(queries: Vec<QueryObject>) -> anyhow::Result<Vec<Query>> {
140140
out.push(query);
141141
}
142142

143-
match final_state(State::Init, out.iter().map(|q| &q.stmt)) {
143+
match predict_final_state(State::Init, out.iter().map(|q| &q.stmt)) {
144144
State::Txn => anyhow::bail!("interactive transaction not allowed in HTTP queries"),
145145
State::Init => (),
146146
// maybe we should err here, but let's sqlite deal with that.

sqld/src/query.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ pub enum QueryResponse {
271271
ResultSet(ResultSet),
272272
}
273273

274-
#[derive(Debug)]
274+
#[derive(Debug, Clone)]
275275
pub struct Query {
276276
pub stmt: Statement,
277277
pub params: Params,
@@ -291,7 +291,7 @@ impl ToSql for Value {
291291
}
292292
}
293293

294-
#[derive(Debug, Serialize)]
294+
#[derive(Debug, Serialize, Clone)]
295295
pub enum Params {
296296
Named(HashMap<String, Value>),
297297
Positional(Vec<Value>),

sqld/src/query_analysis.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use sqlite3_parser::{
66
};
77

88
/// A group of statements to be executed together.
9-
#[derive(Debug)]
9+
#[derive(Debug, Clone)]
1010
pub struct Statement {
1111
pub stmt: String,
1212
pub kind: StmtKind,
@@ -70,11 +70,7 @@ pub enum State {
7070
impl State {
7171
pub fn step(&mut self, kind: StmtKind) {
7272
*self = match (*self, kind) {
73-
// those two transition will cause an error, but since we are interested in what the
74-
// pesimistic final state is, and we will adjust when we get the actual state back.
75-
(State::Txn, StmtKind::TxnBegin) => State::Txn,
76-
(State::Init, StmtKind::TxnEnd) => State::Init,
77-
73+
(State::Txn, StmtKind::TxnBegin) | (State::Init, StmtKind::TxnEnd) => State::Invalid,
7874
(State::Txn, StmtKind::TxnEnd) => State::Init,
7975
(state, StmtKind::Other | StmtKind::Write | StmtKind::Read) => state,
8076
(State::Invalid, _) => State::Invalid,
@@ -145,9 +141,12 @@ impl Statement {
145141
}
146142
}
147143

148-
/// Given a an initial state and an array of queries, return the final state obtained if all the
149-
/// queries succeeded
150-
pub fn final_state<'a>(mut state: State, stmts: impl Iterator<Item = &'a Statement>) -> State {
144+
/// Given a an initial state and an array of queries, attempts to predict what the final state will
145+
/// be
146+
pub fn predict_final_state<'a>(
147+
mut state: State,
148+
stmts: impl Iterator<Item = &'a Statement>,
149+
) -> State {
151150
for stmt in stmts {
152151
state.step(stmt.kind);
153152
}

sqld/src/rpc/proxy.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ use self::rpc::{Ack, DisconnectMessage, ExecuteResults};
1515
pub mod rpc {
1616
#![allow(clippy::all)]
1717

18+
use std::sync::Arc;
19+
1820
use anyhow::Context;
1921

2022
use crate::query::QueryResponse;
@@ -145,13 +147,13 @@ pub mod rpc {
145147
type Error = anyhow::Error;
146148

147149
fn try_from(pgm: Program) -> Result<Self, Self::Error> {
148-
Ok(Self {
149-
steps: pgm
150-
.steps
151-
.into_iter()
152-
.map(TryInto::try_into)
153-
.collect::<anyhow::Result<_>>()?,
154-
})
150+
let steps = pgm
151+
.steps
152+
.into_iter()
153+
.map(TryInto::try_into)
154+
.collect::<anyhow::Result<_>>()?;
155+
156+
Ok(Self::new(steps))
155157
}
156158
}
157159

@@ -215,8 +217,14 @@ pub mod rpc {
215217

216218
impl From<database::Program> for Program {
217219
fn from(pgm: database::Program) -> Self {
220+
// TODO: use unwrap_or_clone when stable
221+
let steps = match Arc::try_unwrap(pgm.steps) {
222+
Ok(steps) => steps,
223+
Err(arc) => (*arc).clone(),
224+
};
225+
218226
Self {
219-
steps: pgm.steps.into_iter().map(|s| s.into()).collect(),
227+
steps: steps.into_iter().map(|s| s.into()).collect(),
220228
}
221229
}
222230
}

0 commit comments

Comments
 (0)