Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit 4f059f5

Browse files
bors[bot]honzasp
andauthored
Merge #300
300: Hrana batches r=honzasp a=honzasp The whole "edge" is (only) about reducing latency, so sqld clients should strive to minimize the number of network roundtrips. This means that users should prefer batched transactions to interactive transaction; these should take just a single roundtrip. Unfortunately, up until now it was impossible to safely implement batched transactions in Hrana with just a single roundtrip. To execute a batch of statements _S1, ..., Sn_, we need to do the following: - Execute a `BEGIN` statement - If the `BEGIN` statement was successful, execute _S1_ - If `BEGIN` and _S1_ were successful, execute _S2_ - ... - If `BEGIN` and _S1, ..., Sn-1_ were successful, execute _Sn_ - If `BEGIN` and _S1, ..., Sn_ were successful, execute `COMMIT`, otherwise execute a `ROLLBACK` In other words, we need to execute statements conditionally, based on results of previous statements. The only way to do this in Hrana is to wait for the responses, so we need _n+2_ roundtrips instead of just one! --- This PR changes that by introducing the concept of "batch" to the Hrana protocol. A batch is a sequence of steps, and each step is an SQL statement with optional condition. The condition can refer to success or failure of previous steps. This mechanism will allow clients to implement batched transactions using a single roundtrip. This PR replaces #294 and #295 after discussion with `@MarinPostma.` Co-authored-by: Jan Špaček <[email protected]>
2 parents 8a01508 + 3143d3e commit 4f059f5

File tree

9 files changed

+544
-296
lines changed

9 files changed

+544
-296
lines changed

docs/HRANA_SPEC.md

Lines changed: 86 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ After a stream is opened, the client can execute SQL _statements_ on it. For the
6565
purposes of this protocol, the statements are arbitrary strings with optional
6666
parameters. The protocol can thus work with any SQL dialect.
6767

68+
To reduce the number of roundtrips, the protocol supports batches of statements
69+
that are executed conditionally, based on success or failure of previous
70+
statements. This mechanism is used to implement non-interactive transactions in
71+
a single roundtrip.
72+
6873
## Messages
6974

7075
All messages exchanged between the client and server are text messages encoded
@@ -74,7 +79,7 @@ messages with a more compact binary encoding.
7479
This specification describes the JSON messages using TypeScript syntax as
7580
follows:
7681

77-
```
82+
```typescript
7883
type ClientMsg =
7984
| HelloMsg
8085
| RequestMsg
@@ -91,7 +96,7 @@ type `ServerMsg`. The type of the message is determined by its `type` field.
9196
9297
### Hello
9398
94-
```
99+
```typescript
95100
type HelloMsg = {
96101
"type": "hello",
97102
"jwt": string | null,
@@ -106,7 +111,7 @@ such as with mutual TLS), the `jwt` field might be set to `null`.
106111
107112
[rfc7519]: https://www.rfc-editor.org/rfc/rfc7519
108113
109-
```
114+
```typescript
110115
type HelloOkMsg = {
111116
"type": "hello_ok",
112117
}
@@ -128,7 +133,7 @@ should close the WebSocket immediately.
128133
129134
### Request/response
130135
131-
```
136+
```typescript
132137
type RequestMsg = {
133138
"type": "request",
134139
"request_id": int32,
@@ -141,7 +146,7 @@ messages. The client uses requests to open SQL streams and execute statements on
141146
them. The client assigns an identifier to every request, which is then used to
142147
match a response to the request.
143148
144-
```
149+
```typescript
145150
type ResponseOkMsg = {
146151
"type": "response_ok",
147152
"request_id": int32,
@@ -170,7 +175,7 @@ other hand, the client should always receive messages, to avoid deadlock.
170175
171176
### Errors
172177
173-
```
178+
```typescript
174179
type Error = {
175180
"message": string,
176181
}
@@ -196,24 +201,26 @@ response.
196201
197202
Most of the work in the protocol happens in request/response interactions.
198203
199-
```
204+
```typescript
200205
type Request =
201206
| OpenStreamReq
202207
| CloseStreamReq
203208
| ExecuteReq
209+
| BatchReq
204210

205211
type Response =
206212
| OpenStreamResp
207213
| CloseStreamResp
208214
| ExecuteResp
215+
| BatchResp
209216
```
210217
211218
The type of the request and response is determined by its `type` field. The
212219
`type` of the response must always match the `type` of the request.
213220
214221
### Open stream
215222
216-
```
223+
```typescript
217224
type OpenStreamReq = {
218225
"type": "open_stream",
219226
"stream_id": int32,
@@ -242,7 +249,7 @@ same time.
242249
243250
### Close stream
244251
245-
```
252+
```typescript
246253
type CloseStreamReq = {
247254
"type": "close_stream",
248255
"stream_id": int32,
@@ -262,7 +269,7 @@ returned an error.
262269
263270
### Execute a statement
264271
265-
```
272+
```typescript
266273
type ExecuteReq = {
267274
"type": "execute",
268275
"stream_id": int32,
@@ -278,7 +285,7 @@ type ExecuteResp = {
278285
The client sends an `execute` request to execute an SQL statement on a stream.
279286
The server responds with the result of the statement.
280287
281-
```
288+
```typescript
282289
type Stmt = {
283290
"sql": string,
284291
"args"?: Array<Value>,
@@ -315,7 +322,7 @@ reply with no rows, even if the statement produced some.
315322
The SQL text should contain just a single statement. Issuing multiple statements
316323
separated by a semicolon is not supported.
317324
318-
```
325+
```typescript
319326
type StmtResult = {
320327
"cols": Array<Col>,
321328
"rows": Array<Array<Value>>,
@@ -340,9 +347,75 @@ DELETE, and the value is otherwise undefined.
340347
table. The rowid value is a 64-bit signed integer encoded as a string. For
341348
other statements, the value is undefined.
342349
343-
### Values
350+
### Execute a batch
351+
352+
```typescript
353+
type BatchReq = {
354+
"type": "batch",
355+
"stream_id": int32,
356+
"batch": Batch,
357+
}
344358

359+
type BatchResp = {
360+
"type": "batch",
361+
"result": BatchResult,
362+
}
345363
```
364+
365+
The `batch` request runs a batch of statements on a stream. The server responds
366+
with the result of the batch execution.
367+
368+
```typescript
369+
type Batch = {
370+
"steps": Array<BatchStep>,
371+
}
372+
373+
type BatchStep = {
374+
"condition"?: BatchCond | null,
375+
"stmt": Stmt,
376+
}
377+
378+
type BatchResult = {
379+
"step_results": Array<StmtResult | null>,
380+
"step_errors": Array<Error | null>,
381+
}
382+
```
383+
384+
A batch is a list of steps (statements) which are always executed sequentially.
385+
If the `condition` of a step is present and evaluates to false, the statement is
386+
skipped.
387+
388+
The batch result contains the results or errors of statements from each step.
389+
For the step in `steps[i]`, `step_results[i]` contains the result of the
390+
statement if the statement was executed and succeeded, and `step_errors[i]`
391+
contains the error if the statement was executed and failed. If the statement
392+
was skipped because its condition evaluated to false, both `step_results[i]` and
393+
`step_errors[i]` will be `null`.
394+
395+
```typescript
396+
type BatchCond =
397+
| { "type": "ok", "step": int32 }
398+
| { "type": "error", "step": int32 }
399+
| { "type": "not", "cond": BatchCond }
400+
| { "type": "and", "conds": Array<BatchCond> }
401+
| { "type": "or", "conds": Array<BatchCond> }
402+
```
403+
404+
Conditions are expressions that evaluate to true or false:
405+
406+
- `ok` evaluates to true if the `step` (referenced by its 0-based index) was
407+
executed successfully. If the statement was skipped, this condition evaluates to
408+
false.
409+
- `error` evaluates to true if the `step` (referenced by its 0-based index) has
410+
produced an error. If the statement was skipped, this condition evaluates to
411+
false.
412+
- `not` evaluates `cond` and returns the logical negative.
413+
- `and` evaluates `conds` and returns the logical conjunction of them.
414+
- `or` evaluates `conds` and returns the logical disjunction of them.
415+
416+
### Values
417+
418+
```typescript
346419
type Value =
347420
| { "type": "null" }
348421
| { "type": "integer", "value": string }

sqld/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ rusqlite = { version = "0.28.0", git = "https://github.com/psarna/rusqlite", rev
3838
"bundled-libsql-wasm",
3939
"column_decltype"
4040
] }
41-
serde = { version = "1.0.149", features = ["derive"] }
41+
serde = { version = "1.0.149", features = ["derive", "rc"] }
4242
serde_json = "1.0.91"
4343
smallvec = "1.10.0"
4444
sqld-libsql-bindings = { version = "0", path = "../sqld-libsql-bindings" }

sqld/src/batch/mod.rs

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
use self::stmt::proto_error_from_stmt_error;
2+
pub use self::stmt::{execute_stmt, StmtError};
3+
use crate::database::Database;
4+
use anyhow::{bail, Result};
5+
6+
pub mod proto;
7+
mod stmt;
8+
9+
#[derive(thiserror::Error, Debug)]
10+
pub enum BatchError {
11+
#[error("Invalid reference to step in a condition")]
12+
CondBadStep,
13+
}
14+
15+
#[derive(Debug, Default)]
16+
struct Ctx {
17+
results: Vec<Option<Result<proto::StmtResult, StmtError>>>,
18+
}
19+
20+
pub async fn execute_batch(db: &dyn Database, prog: &proto::Batch) -> Result<proto::BatchResult> {
21+
let mut ctx = Ctx::default();
22+
for step in prog.steps.iter() {
23+
execute_step(&mut ctx, db, step).await?;
24+
}
25+
26+
let mut step_results = Vec::with_capacity(ctx.results.len());
27+
let mut step_errors = Vec::with_capacity(ctx.results.len());
28+
for result in ctx.results.into_iter() {
29+
let (step_result, step_error) = match result {
30+
Some(Ok(stmt_res)) => (Some(stmt_res), None),
31+
Some(Err(stmt_err)) => (None, Some(proto_error_from_stmt_error(&stmt_err))),
32+
None => (None, None),
33+
};
34+
step_results.push(step_result);
35+
step_errors.push(step_error);
36+
}
37+
38+
Ok(proto::BatchResult {
39+
step_results,
40+
step_errors,
41+
})
42+
}
43+
44+
async fn execute_step(ctx: &mut Ctx, db: &dyn Database, step: &proto::BatchStep) -> Result<()> {
45+
let enabled = match step.condition.as_ref() {
46+
Some(cond) => eval_cond(ctx, cond)?,
47+
None => true,
48+
};
49+
50+
let result = if enabled {
51+
Some(match execute_stmt(db, &step.stmt).await {
52+
Ok(stmt_result) => Ok(stmt_result),
53+
Err(err) => Err(err.downcast::<StmtError>()?),
54+
})
55+
} else {
56+
None
57+
};
58+
59+
ctx.results.push(result);
60+
Ok(())
61+
}
62+
63+
fn eval_cond(ctx: &Ctx, cond: &proto::BatchCond) -> Result<bool> {
64+
let get_step_res = |step: i32| -> Result<&Option<Result<proto::StmtResult, StmtError>>> {
65+
let Ok(step) = usize::try_from(step) else {
66+
bail!(BatchError::CondBadStep)
67+
};
68+
let Some(res) = ctx.results.get(step) else {
69+
bail!(BatchError::CondBadStep)
70+
};
71+
Ok(res)
72+
};
73+
74+
Ok(match cond {
75+
proto::BatchCond::Ok { step } => match get_step_res(*step)? {
76+
Some(Ok(_)) => true,
77+
Some(Err(_)) => false,
78+
None => false,
79+
},
80+
proto::BatchCond::Error { step } => match get_step_res(*step)? {
81+
Some(Ok(_)) => false,
82+
Some(Err(_)) => true,
83+
None => false,
84+
},
85+
proto::BatchCond::Not { cond } => !eval_cond(ctx, cond)?,
86+
proto::BatchCond::And { conds } => conds
87+
.iter()
88+
.try_fold(true, |x, cond| eval_cond(ctx, cond).map(|y| x & y))?,
89+
proto::BatchCond::Or { conds } => conds
90+
.iter()
91+
.try_fold(false, |x, cond| eval_cond(ctx, cond).map(|y| x | y))?,
92+
})
93+
}

0 commit comments

Comments
 (0)