Skip to content

Commit 4038e9e

Browse files
dvdplmniklasad1insipx
authored
[ws-server] Batch support (#300)
* Batch requests over the websocket * Use same call signature for both http and ws server * Use CallError instead of InvalidParams Add a draft batch request test * Sort out formatting of batch responses Cleanup Use CallError * Add test for slow method calls * Update utils/src/server.rs Co-authored-by: Niklas Adolfsson <[email protected]> * Add benchmark for batched websocket requests * rename * Rename batch channels * Sort out the InvalidParams situation * Remove one level of InvalidParams Return app-level error when call fails * Update ws-server/src/server.rs Co-authored-by: Andrew Plaza <[email protected]> * Update ws-server/src/server.rs Co-authored-by: Andrew Plaza <[email protected]> Co-authored-by: Niklas Adolfsson <[email protected]> Co-authored-by: Andrew Plaza <[email protected]>
1 parent b51abec commit 4038e9e

File tree

9 files changed

+235
-79
lines changed

9 files changed

+235
-79
lines changed

benches/bench.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,14 @@ pub fn websocket_requests(crit: &mut Criterion) {
6363
run_concurrent_round_trip(&rt, crit, client.clone(), "ws_concurrent_round_trip");
6464
}
6565

66+
pub fn batched_ws_requests(crit: &mut Criterion) {
67+
let rt = TokioRuntime::new().unwrap();
68+
let url = rt.block_on(helpers::ws_server());
69+
let client =
70+
Arc::new(rt.block_on(WsClientBuilder::default().max_concurrent_requests(1024 * 1024).build(&url)).unwrap());
71+
run_round_trip_with_batch(&rt, crit, client.clone(), "ws batch requests");
72+
}
73+
6674
fn run_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl Client>, name: &str) {
6775
crit.bench_function(name, |b| {
6876
b.iter(|| {

http-server/src/module.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use jsonrpsee_types::v2::error::{JsonRpcErrorCode, JsonRpcErrorObject, CALL_EXECUTION_FAILED_CODE};
22
use jsonrpsee_types::{
3-
error::{CallError, Error, InvalidParams},
3+
error::{CallError, Error},
44
traits::RpcMethod,
55
v2::params::RpcParams,
66
};
@@ -36,7 +36,7 @@ impl RpcModule {
3636
pub fn register_method<F, R>(&mut self, method_name: &'static str, callback: F) -> Result<(), Error>
3737
where
3838
R: Serialize,
39-
F: RpcMethod<R, InvalidParams>,
39+
F: RpcMethod<R, CallError>,
4040
{
4141
self.verify_method_name(method_name)?;
4242

@@ -45,7 +45,16 @@ impl RpcModule {
4545
Box::new(move |id, params, tx, _| {
4646
match callback(params) {
4747
Ok(res) => send_response(id, tx, res),
48-
Err(InvalidParams) => send_error(id, tx, JsonRpcErrorCode::InvalidParams.into()),
48+
Err(CallError::InvalidParams) => send_error(id, tx, JsonRpcErrorCode::InvalidParams.into()),
49+
Err(CallError::Failed(err)) => {
50+
log::error!("Call failed with: {}", err);
51+
let err = JsonRpcErrorObject {
52+
code: JsonRpcErrorCode::ServerError(CALL_EXECUTION_FAILED_CODE),
53+
message: &err.to_string(),
54+
data: None,
55+
};
56+
send_error(id, tx, err)
57+
}
4958
};
5059

5160
Ok(())
@@ -99,7 +108,7 @@ impl<Context> RpcContextModule<Context> {
99108
Box::new(move |id, params, tx, _| {
100109
match callback(params, &*ctx) {
101110
Ok(res) => send_response(id, tx, res),
102-
Err(CallError::InvalidParams(_)) => send_error(id, tx, JsonRpcErrorCode::InvalidParams.into()),
111+
Err(CallError::InvalidParams) => send_error(id, tx, JsonRpcErrorCode::InvalidParams.into()),
103112
Err(CallError::Failed(err)) => {
104113
let err = JsonRpcErrorObject {
105114
code: JsonRpcErrorCode::ServerError(CALL_EXECUTION_FAILED_CODE),

http-server/src/server.rs

Lines changed: 25 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,14 @@ use hyper::{
3636
service::{make_service_fn, service_fn},
3737
Error as HyperError,
3838
};
39-
use jsonrpsee_types::error::{Error, GenericTransportError, InvalidParams};
39+
use jsonrpsee_types::error::{CallError, Error, GenericTransportError};
4040
use jsonrpsee_types::v2::request::{JsonRpcInvalidRequest, JsonRpcRequest};
4141
use jsonrpsee_types::v2::{error::JsonRpcErrorCode, params::RpcParams};
4242
use jsonrpsee_utils::{
4343
hyper_helpers::read_response_to_body,
44-
server::{send_error, RpcSender},
44+
server::{collect_batch_response, send_error, RpcSender},
4545
};
4646
use serde::Serialize;
47-
use serde_json::value::RawValue;
4847
use socket2::{Domain, Socket, Type};
4948
use std::{
5049
cmp,
@@ -129,7 +128,7 @@ impl Server {
129128
pub fn register_method<F, R>(&mut self, method_name: &'static str, callback: F) -> Result<(), Error>
130129
where
131130
R: Serialize,
132-
F: Fn(RpcParams) -> Result<R, InvalidParams> + Send + Sync + 'static,
131+
F: Fn(RpcParams) -> Result<R, CallError> + Send + Sync + 'static,
133132
{
134133
self.root.register_method(method_name, callback)
135134
}
@@ -162,23 +161,23 @@ impl Server {
162161
// Look up the "method" (i.e. function pointer) from the registered methods and run it passing in
163162
// the params from the request. The result of the computation is sent back over the `tx` channel and
164163
// the result(s) are collected into a `String` and sent back over the wire.
165-
let execute =
166-
move |id: Option<&RawValue>, tx: RpcSender, method_name: &str, params: Option<&RawValue>| {
167-
if let Some(method) = methods.get(method_name) {
168-
let params = RpcParams::new(params.map(|params| params.get()));
169-
// NOTE(niklasad1): connection ID is unused thus hardcoded to `0`.
170-
if let Err(err) = (method)(id, params, &tx, 0) {
171-
log::error!(
172-
"execution of method call '{}' failed: {:?}, request id={:?}",
173-
method_name,
174-
err,
175-
id
176-
);
177-
}
178-
} else {
179-
send_error(id, tx, JsonRpcErrorCode::MethodNotFound.into());
164+
let execute = move |tx: RpcSender, req: JsonRpcRequest| {
165+
if let Some(method) = methods.get(&*req.method) {
166+
let params = RpcParams::new(req.params.map(|params| params.get()));
167+
// NOTE(niklasad1): connection ID is unused thus hardcoded to `0`.
168+
if let Err(err) = (method)(req.id, params, &tx, 0) {
169+
log::error!(
170+
"execution of method call '{}' failed: {:?}, request id={:?}",
171+
req.method,
172+
err,
173+
req.id
174+
);
175+
send_error(req.id, &tx, JsonRpcErrorCode::ServerError(-1).into());
180176
}
181-
};
177+
} else {
178+
send_error(req.id, &tx, JsonRpcErrorCode::MethodNotFound.into());
179+
}
180+
};
182181

183182
// Run some validation on the http request, then read the body and try to deserialize it into one of
184183
// two cases: a single RPC request or a batch of RPC requests.
@@ -203,7 +202,7 @@ impl Server {
203202
};
204203

205204
// NOTE(niklasad1): it's a channel because it's needed for batch requests.
206-
let (tx, mut rx) = mpsc::unbounded();
205+
let (tx, mut rx) = mpsc::unbounded::<String>();
207206
// Is this a single request or a batch (or error)?
208207
let mut single = true;
209208

@@ -213,15 +212,13 @@ impl Server {
213212
// batch case and lastly the error. For the worst case – unparseable input – we make three calls
214213
// to [`serde_json::from_slice`] which is pretty annoying.
215214
// Our [issue](https://github.com/paritytech/jsonrpsee/issues/296).
216-
if let Ok(JsonRpcRequest { id, method: method_name, params, .. }) =
217-
serde_json::from_slice::<JsonRpcRequest>(&body)
218-
{
219-
execute(id, &tx, &method_name, params);
215+
if let Ok(req) = serde_json::from_slice::<JsonRpcRequest>(&body) {
216+
execute(&tx, req);
220217
} else if let Ok(batch) = serde_json::from_slice::<Vec<JsonRpcRequest>>(&body) {
221218
if !batch.is_empty() {
222219
single = false;
223-
for JsonRpcRequest { id, method: method_name, params, .. } in batch {
224-
execute(id, &tx, &method_name, params);
220+
for req in batch {
221+
execute(&tx, req);
225222
}
226223
} else {
227224
send_error(None, &tx, JsonRpcErrorCode::InvalidRequest.into());
@@ -243,7 +240,7 @@ impl Server {
243240
let response = if single {
244241
rx.next().await.expect("Sender is still alive managed by us above; qed")
245242
} else {
246-
collect_batch_responses(rx).await
243+
collect_batch_response(rx).await
247244
};
248245
log::debug!("[service_fn] sending back: {:?}", &response[..cmp::min(response.len(), 1024)]);
249246
Ok::<_, HyperError>(response::ok_response(response))
@@ -257,24 +254,6 @@ impl Server {
257254
}
258255
}
259256

260-
// Collect the results of all computations sent back on the ['Stream'] into a single `String` appropriately wrapped in
261-
// `[`/`]`.
262-
async fn collect_batch_responses(rx: mpsc::UnboundedReceiver<String>) -> String {
263-
let mut buf = String::with_capacity(2048);
264-
buf.push('[');
265-
let mut buf = rx
266-
.fold(buf, |mut acc, response| async {
267-
acc = [acc, response].concat();
268-
acc.push(',');
269-
acc
270-
})
271-
.await;
272-
// Remove trailing comma
273-
buf.pop();
274-
buf.push(']');
275-
buf
276-
}
277-
278257
// Checks to that access control of the received request is the same as configured.
279258
fn access_control_is_valid(
280259
access_control: &AccessControl,

test-utils/src/helpers.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,13 @@ pub fn internal_error(id: Id) -> String {
7272
)
7373
}
7474

75+
pub fn server_error(id: Id) -> String {
76+
format!(
77+
r#"{{"jsonrpc":"2.0","error":{{"code":-32000,"message":"Server error"}},"id":{}}}"#,
78+
serde_json::to_string(&id).unwrap()
79+
)
80+
}
81+
7582
/// Hardcoded server response when a client initiates a new subscription.
7683
///
7784
/// NOTE: works only for one subscription because the subscription ID is hardcoded.

types/src/error.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,15 @@ pub struct InvalidParams;
2525
pub enum CallError {
2626
#[error("Invalid params in the RPC call")]
2727
/// Invalid params in the call.
28-
InvalidParams(InvalidParams),
28+
InvalidParams,
2929
#[error("RPC Call failed: {0}")]
3030
/// The call failed.
3131
Failed(#[source] Box<dyn std::error::Error + Send + Sync>),
3232
}
3333

3434
impl From<InvalidParams> for CallError {
35-
fn from(params: InvalidParams) -> Self {
36-
Self::InvalidParams(params)
35+
fn from(_params: InvalidParams) -> Self {
36+
Self::InvalidParams
3737
}
3838
}
3939

utils/src/server.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! Shared helpers for JSON-RPC Servers.
22
33
use futures_channel::mpsc;
4+
use futures_util::stream::StreamExt;
45
use jsonrpsee_types::v2::error::{JsonRpcError, JsonRpcErrorCode, JsonRpcErrorObject};
56
use jsonrpsee_types::v2::params::{RpcParams, TwoPointZero};
67
use jsonrpsee_types::v2::response::JsonRpcResponse;
@@ -50,3 +51,21 @@ pub fn send_error(id: RpcId, tx: RpcSender, error: JsonRpcErrorObject) {
5051
log::error!("Error sending response to the client: {:?}", err)
5152
}
5253
}
54+
55+
/// Read all the results of all method calls in a batch request from the ['Stream']. Format the result into a single
56+
/// `String` appropriately wrapped in `[`/`]`.
57+
pub async fn collect_batch_response(rx: mpsc::UnboundedReceiver<String>) -> String {
58+
let mut buf = String::with_capacity(2048);
59+
buf.push('[');
60+
let mut buf = rx
61+
.fold(buf, |mut acc, response| async {
62+
acc = [acc, response].concat();
63+
acc.push(',');
64+
acc
65+
})
66+
.await;
67+
// Remove trailing comma
68+
buf.pop();
69+
buf.push(']');
70+
buf
71+
}

ws-server/src/server.rs

Lines changed: 57 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
use futures_channel::mpsc;
2828
use futures_util::io::{BufReader, BufWriter};
29+
use futures_util::stream::StreamExt;
2930
use parking_lot::Mutex;
3031
use rustc_hash::FxHashMap;
3132
use serde::Serialize;
@@ -34,14 +35,14 @@ use soketto::handshake::{server::Response, Server as SokettoServer};
3435
use std::net::SocketAddr;
3536
use std::sync::Arc;
3637
use tokio::net::{TcpListener, ToSocketAddrs};
37-
use tokio_stream::{wrappers::TcpListenerStream, StreamExt};
38+
use tokio_stream::wrappers::TcpListenerStream;
3839
use tokio_util::compat::TokioAsyncReadCompatExt;
3940

40-
use jsonrpsee_types::error::{Error, InvalidParams};
41+
use jsonrpsee_types::error::{CallError, Error};
4142
use jsonrpsee_types::v2::error::JsonRpcErrorCode;
4243
use jsonrpsee_types::v2::params::{JsonRpcNotificationParams, RpcParams, TwoPointZero};
4344
use jsonrpsee_types::v2::request::{JsonRpcInvalidRequest, JsonRpcNotification, JsonRpcRequest};
44-
use jsonrpsee_utils::server::{send_error, ConnectionId, Methods};
45+
use jsonrpsee_utils::server::{collect_batch_response, send_error, ConnectionId, Methods, RpcSender};
4546

4647
mod module;
4748

@@ -105,7 +106,7 @@ impl Server {
105106
pub fn register_method<F, R>(&mut self, method_name: &'static str, callback: F) -> Result<(), Error>
106107
where
107108
R: Serialize,
108-
F: Fn(RpcParams) -> Result<R, InvalidParams> + Send + Sync + 'static,
109+
F: Fn(RpcParams) -> Result<R, CallError> + Send + Sync + 'static,
109110
{
110111
self.root.register_method(method_name, callback)
111112
}
@@ -149,7 +150,11 @@ impl Server {
149150
}
150151
}
151152

152-
async fn background_task(socket: tokio::net::TcpStream, methods: Arc<Methods>, id: ConnectionId) -> anyhow::Result<()> {
153+
async fn background_task(
154+
socket: tokio::net::TcpStream,
155+
methods: Arc<Methods>,
156+
conn_id: ConnectionId,
157+
) -> anyhow::Result<()> {
153158
// For each incoming background_task we perform a handshake.
154159
let mut server = SokettoServer::new(BufReader::new(BufWriter::new(socket.compat())));
155160

@@ -166,38 +171,69 @@ async fn background_task(socket: tokio::net::TcpStream, methods: Arc<Methods>, i
166171
let (mut sender, mut receiver) = server.into_builder().finish();
167172
let (tx, mut rx) = mpsc::unbounded::<String>();
168173

174+
// Send results back to the client.
169175
tokio::spawn(async move {
170176
while let Some(response) = rx.next().await {
171177
let _ = sender.send_binary_mut(response.into_bytes()).await;
172178
let _ = sender.flush().await;
173179
}
174180
});
175181

176-
let mut data = Vec::new();
182+
let mut data = Vec::with_capacity(100);
183+
184+
// Look up the "method" (i.e. function pointer) from the registered methods and run it passing in
185+
// the params from the request. The result of the computation is sent back over the `tx` channel and
186+
// the result(s) are collected into a `String` and sent back over the wire.
187+
let execute = move |tx: RpcSender, req: JsonRpcRequest| {
188+
if let Some(method) = methods.get(&*req.method) {
189+
let params = RpcParams::new(req.params.map(|params| params.get()));
190+
if let Err(err) = (method)(req.id, params, &tx, conn_id) {
191+
log::error!("execution of method call '{}' failed: {:?}, request id={:?}", req.method, err, req.id);
192+
send_error(req.id, &tx, JsonRpcErrorCode::ServerError(-1).into());
193+
}
194+
} else {
195+
send_error(req.id, &tx, JsonRpcErrorCode::MethodNotFound.into());
196+
}
197+
};
177198

178199
loop {
179200
data.clear();
180201

181202
receiver.receive_data(&mut data).await?;
182203

183-
match serde_json::from_slice::<JsonRpcRequest>(&data) {
184-
Ok(req) => {
185-
let params = RpcParams::new(req.params.map(|params| params.get()));
186-
187-
if let Some(method) = methods.get(&*req.method) {
188-
(method)(req.id, params, &tx, id)?;
189-
} else {
190-
send_error(req.id, &tx, JsonRpcErrorCode::MethodNotFound.into());
204+
// For reasons outlined [here](https://github.com/serde-rs/json/issues/497), `RawValue` can't be used with
205+
// untagged enums at the moment. This means we can't use an `SingleOrBatch` untagged enum here and have to try
206+
// each case individually: first the single request case, then the batch case and lastly the error. For the
207+
// worst case – unparseable input – we make three calls to [`serde_json::from_slice`] which is pretty annoying.
208+
// Our [issue](https://github.com/paritytech/jsonrpsee/issues/296).
209+
if let Ok(req) = serde_json::from_slice::<JsonRpcRequest>(&data) {
210+
execute(&tx, req);
211+
} else if let Ok(batch) = serde_json::from_slice::<Vec<JsonRpcRequest>>(&data) {
212+
if !batch.is_empty() {
213+
// Batch responses must be sent back as a single message so we read the results from each request in the
214+
// batch and read the results off of a new channel, `rx_batch`, and then send the complete batch response
215+
// back to the client over `tx`.
216+
let (tx_batch, mut rx_batch) = mpsc::unbounded::<String>();
217+
for req in batch {
218+
execute(&tx_batch, req);
219+
}
220+
// Closes the receiving half of a channel without dropping it. This prevents any further messages from
221+
// being sent on the channel.
222+
rx_batch.close();
223+
let results = collect_batch_response(rx_batch).await;
224+
if let Err(err) = tx.unbounded_send(results) {
225+
log::error!("Error sending batch response to the client: {:?}", err)
191226
}
227+
} else {
228+
send_error(None, &tx, JsonRpcErrorCode::InvalidRequest.into());
192229
}
193-
Err(_) => {
194-
let (id, code) = match serde_json::from_slice::<JsonRpcInvalidRequest>(&data) {
195-
Ok(req) => (req.id, JsonRpcErrorCode::InvalidRequest),
196-
Err(_) => (None, JsonRpcErrorCode::ParseError),
197-
};
230+
} else {
231+
let (id, code) = match serde_json::from_slice::<JsonRpcInvalidRequest>(&data) {
232+
Ok(req) => (req.id, JsonRpcErrorCode::InvalidRequest),
233+
Err(_) => (None, JsonRpcErrorCode::ParseError),
234+
};
198235

199-
send_error(id, &tx, code.into());
200-
}
236+
send_error(id, &tx, code.into());
201237
}
202238
}
203239
}

0 commit comments

Comments
 (0)