Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit 950cede

Browse files
Merge #259
259: fix dump bug r=MarinPostma a=MarinPostma Fix bug that prevented large dumps to be loaded Co-authored-by: ad hoc <[email protected]>
2 parents ce7964e + d5fdb9d commit 950cede

File tree

4 files changed

+80
-10
lines changed

4 files changed

+80
-10
lines changed

sqld/src/http/mod.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -229,18 +229,27 @@ fn handle_version() -> Response<Body> {
229229
Response::new(Body::from(version.as_bytes()))
230230
}
231231

232+
async fn load_body(mut req: Request<Body>) -> anyhow::Result<Bytes> {
233+
let mut bytes = BytesMut::new();
234+
while let Some(data) = req.data().await {
235+
let data = data?;
236+
bytes.extend(data);
237+
}
238+
239+
Ok(bytes.freeze())
240+
}
241+
232242
async fn load_dump(
233-
mut req: Request<Body>,
243+
req: Request<Body>,
234244
sender: mpsc::Sender<Message>,
235245
) -> anyhow::Result<Response<Body>> {
236-
match req.data().await {
237-
Some(Ok(data)) => {
246+
match load_body(req).await {
247+
Ok(data) => {
238248
// FIXME: Dumps may not fit in memory. A better way would be to stream the payload, and
239249
// have a dedicated path to load the dump from it.
240250
let mut queries = Vec::new();
241-
let s = String::from_utf8(data.to_vec())?;
242-
for line in s.lines() {
243-
let stmt = Statement::new_unchecked(line);
251+
for stmt in Statement::parse_unchecked(&data) {
252+
let stmt = stmt?;
244253
queries.push(Query {
245254
stmt,
246255
params: Params::empty(),
@@ -258,8 +267,7 @@ async fn load_dump(
258267
Err(e) => Ok(error(&e.to_string(), StatusCode::INTERNAL_SERVER_ERROR)),
259268
}
260269
}
261-
Some(Err(e)) => Ok(error(&e.to_string(), StatusCode::INTERNAL_SERVER_ERROR)),
262-
None => Ok(Response::new(Body::empty())),
270+
Err(e) => Ok(error(&e.to_string(), StatusCode::INTERNAL_SERVER_ERROR)),
263271
}
264272
}
265273

sqld/src/query_analysis.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,14 +94,33 @@ impl Statement {
9494
/// Returns a statement instance without performing any validation to the input
9595
/// It is always assumed that such a statement will be a write, and will always be handled by
9696
/// the primary
97-
pub fn new_unchecked(s: &str) -> Self {
97+
pub fn new_unchecked(stmt: String) -> Self {
9898
Self {
99-
stmt: s.to_string(),
99+
stmt,
100100
kind: StmtKind::Write,
101101
is_iud: false,
102102
}
103103
}
104104

105+
/// parses a series of statements into unchecked statemments
106+
pub fn parse_unchecked(s: &[u8]) -> impl Iterator<Item = Result<Self>> + '_ {
107+
let mut parser = Box::new(Parser::new(s));
108+
std::iter::from_fn(move || match parser.next() {
109+
Ok(Some(cmd)) => Some(Ok(Self::new_unchecked(cmd.to_string()))),
110+
Ok(None) => None,
111+
Err(sqlite3_parser::lexer::sql::Error::ParserError(
112+
ParserError::SyntaxError {
113+
token_type: _,
114+
found: Some(found),
115+
},
116+
Some((line, col)),
117+
)) => Some(Err(anyhow::anyhow!(
118+
"syntax error around L{line}:{col}: `{found}`"
119+
))),
120+
Err(e) => Some(Err(e.into())),
121+
})
122+
}
123+
105124
pub fn parse(s: &str) -> impl Iterator<Item = Result<Self>> + '_ {
106125
fn parse_inner(c: Cmd) -> Result<Statement> {
107126
let kind =

testing/end-to-end/integration/basic_cluster/dump.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,3 +65,45 @@ async fn load_dump_from_replica(app: App) {
6565

6666
insta::assert_json_snapshot!(resp.json::<serde_json::Value>().await.unwrap());
6767
}
68+
69+
#[octopod::test(app = "simple-cluster")]
70+
async fn load_large_dump(app: App) {
71+
let primary_ip = app.service("primary").unwrap().ip().await.unwrap();
72+
let url = format!("http://{primary_ip}:8080/load-dump");
73+
let client = reqwest::Client::new();
74+
75+
let dump = std::iter::once("CREATE TABLE test (who);\n")
76+
.chain(std::iter::repeat("INSERT INTO test VALUES ('hello world');\n").take(10000))
77+
.fold(String::new(), |acc, s| acc + s);
78+
79+
let resp = client.post(url).body(dump).send().await.unwrap();
80+
assert!(resp.status().is_success(), "{}", resp.text().await.unwrap());
81+
82+
// wait for dump to be loaded and replicated
83+
tokio::time::sleep(Duration::from_secs(3)).await;
84+
85+
let url = format!("http://{primary_ip}:8080");
86+
let resp = client
87+
.post(url)
88+
.json(&json!({
89+
"statements": ["select count(*) from test"]
90+
}))
91+
.send()
92+
.await
93+
.unwrap();
94+
95+
insta::assert_json_snapshot!(resp.json::<serde_json::Value>().await.unwrap());
96+
97+
// ensure replica is up to date
98+
let replica_id = app.service("replica").unwrap().ip().await.unwrap();
99+
let url = format!("http://{replica_id}:8080");
100+
let resp = client
101+
.post(url)
102+
.json(&json!({
103+
"statements": ["select count(*) from test"]
104+
}))
105+
.send()
106+
.await
107+
.unwrap();
108+
insta::assert_json_snapshot!(resp.json::<serde_json::Value>().await.unwrap());
109+
}

testing/end-to-end/run-macos.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,6 @@
33
podman build ../.. -t sqld
44
podman save -o sqld.tar sqld
55
podman build . -t end-to-end --isolation=chroot
6+
echo "here"
67
podman run --privileged -v $PWD:/end-to-end -it --rm end-to-end
78
rm sqld.tar

0 commit comments

Comments
 (0)