Skip to content

Commit 4f51e2f

Browse files
niklasad1dvdplm
andauthored
[servers] extract rpc modules to utils (#322)
* [rpc server]: extract rpc_module to utils. This commit extracts the `rpc_module` from the servers to be shared. It will help to re-use rpc modules within both the servers * Update utils/src/server/rpc_module.rs Co-authored-by: David <[email protected]> * Update utils/src/server/rpc_module.rs Co-authored-by: David <[email protected]> * Update utils/src/server/mod.rs Co-authored-by: David <[email protected]> * Update utils/src/server/rpc_module.rs Co-authored-by: David <[email protected]> * Update utils/src/server/rpc_module.rs Co-authored-by: David <[email protected]> * Update utils/src/server/rpc_module.rs Co-authored-by: David <[email protected]> * Update utils/src/server/rpc_module.rs Co-authored-by: David <[email protected]> * Update utils/src/server/rpc_module.rs Co-authored-by: David <[email protected]> * Update utils/src/server/rpc_module.rs Co-authored-by: David <[email protected]> * Update utils/src/server/rpc_module.rs Co-authored-by: David <[email protected]> * address grumbles * fix build * fix docs * cargo fmt * Update utils/src/server/rpc_module.rs Co-authored-by: David <[email protected]> * grumbles: use MethodSink * Update utils/src/server/mod.rs Co-authored-by: David <[email protected]> * Update utils/src/server/rpc_module.rs Co-authored-by: David <[email protected]> * Update utils/src/server/rpc_module.rs Co-authored-by: David <[email protected]> Co-authored-by: David <[email protected]>
1 parent 8780fce commit 4f51e2f

File tree

11 files changed

+118
-226
lines changed

11 files changed

+118
-226
lines changed

http-server/src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,12 @@
2525
// DEALINGS IN THE SOFTWARE.
2626

2727
mod access_control;
28-
mod module;
2928
mod response;
3029
mod server;
3130

3231
pub use access_control::{AccessControl, AccessControlBuilder, AllowHosts, Host};
3332
pub use jsonrpsee_types::{Error, TEN_MB_SIZE_BYTES};
34-
pub use module::{RpcContextModule, RpcModule};
33+
pub use jsonrpsee_utils::server::rpc_module::{Methods, RpcContextModule, RpcModule};
3534
pub use server::{Builder as HttpServerBuilder, Server as HttpServer};
3635

3736
#[cfg(test)]

http-server/src/module.rs

Lines changed: 0 additions & 132 deletions
This file was deleted.

http-server/src/server.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,7 @@
2424
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
2525
// DEALINGS IN THE SOFTWARE.
2626

27-
use crate::module::RpcModule;
28-
use crate::response;
29-
use crate::AccessControl;
30-
use crate::TEN_MB_SIZE_BYTES;
27+
use crate::{response, AccessControl, TEN_MB_SIZE_BYTES};
3128
use anyhow::anyhow;
3229
use futures_channel::mpsc;
3330
use futures_util::stream::StreamExt;
@@ -39,10 +36,10 @@ use hyper::{
3936
use jsonrpsee_types::error::{CallError, Error, GenericTransportError};
4037
use jsonrpsee_types::v2::request::{JsonRpcInvalidRequest, JsonRpcRequest};
4138
use jsonrpsee_types::v2::{error::JsonRpcErrorCode, params::RpcParams};
42-
use jsonrpsee_utils::{
43-
hyper_helpers::read_response_to_body,
44-
server::{collect_batch_response, send_error, RpcSender},
45-
};
39+
use jsonrpsee_utils::hyper_helpers::read_response_to_body;
40+
use jsonrpsee_utils::server::helpers::{collect_batch_response, send_error};
41+
use jsonrpsee_utils::server::rpc_module::{MethodSink, RpcModule};
42+
4643
use serde::Serialize;
4744
use socket2::{Domain, Socket, Type};
4845
use std::{
@@ -161,7 +158,7 @@ impl Server {
161158
// Look up the "method" (i.e. function pointer) from the registered methods and run it passing in
162159
// the params from the request. The result of the computation is sent back over the `tx` channel and
163160
// the result(s) are collected into a `String` and sent back over the wire.
164-
let execute = move |tx: RpcSender, req: JsonRpcRequest| {
161+
let execute = move |tx: &MethodSink, req: JsonRpcRequest| {
165162
if let Some(method) = methods.get(&*req.method) {
166163
let params = RpcParams::new(req.params.map(|params| params.get()));
167164
// NOTE(niklasad1): connection ID is unused thus hardcoded to `0`.

types/src/v2/params.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,3 +191,7 @@ impl Id {
191191
}
192192
}
193193
}
194+
195+
/// Untyped JSON-RPC ID.
196+
// TODO(niklasad1): this should be enforced to only accept: String, Number, or Null.
197+
pub type JsonRpcRawId<'a> = Option<&'a serde_json::value::RawValue>;

utils/Cargo.toml

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,27 @@ hyper14 = { package = "hyper", version = "0.14", default-features = false, featu
1515
jsonrpsee-types = { path = "../types", version = "0.2.0-alpha.6", optional = true }
1616
log = { version = "0.4", optional = true }
1717
rustc-hash = { version = "1", optional = true }
18+
rand = { version = "0.8", optional = true }
1819
serde = { version = "1.0", default-features = false, features = ["derive"], optional = true }
1920
serde_json = { version = "1", features = ["raw_value"], optional = true }
21+
parking_lot = { version = "0.11", optional = true }
2022

2123
[features]
2224
default = []
2325
hyper_13 = ["hyper13", "futures-util", "jsonrpsee-types"]
2426
hyper_14 = ["hyper14", "futures-util", "jsonrpsee-types"]
25-
server = ["anyhow", "futures-channel", "futures-util", "jsonrpsee-types", "rustc-hash", "serde", "serde_json", "log"]
27+
server = [
28+
"anyhow",
29+
"futures-channel",
30+
"futures-util",
31+
"jsonrpsee-types",
32+
"rustc-hash",
33+
"serde",
34+
"serde_json",
35+
"log",
36+
"parking_lot",
37+
"rand"
38+
]
2639

2740
[dev-dependencies]
2841
serde_json = "1.0"

utils/src/server.rs renamed to utils/src/server/helpers.rs

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,13 @@
1-
//! Shared helpers for JSON-RPC Servers.
2-
1+
use crate::server::rpc_module::MethodSink;
32
use futures_channel::mpsc;
43
use futures_util::stream::StreamExt;
54
use jsonrpsee_types::v2::error::{JsonRpcError, JsonRpcErrorCode, JsonRpcErrorObject};
6-
use jsonrpsee_types::v2::params::{RpcParams, TwoPointZero};
5+
use jsonrpsee_types::v2::params::{JsonRpcRawId, TwoPointZero};
76
use jsonrpsee_types::v2::response::JsonRpcResponse;
8-
use rustc_hash::FxHashMap;
97
use serde::Serialize;
10-
use serde_json::value::RawValue;
11-
12-
/// Connection ID.
13-
pub type ConnectionId = usize;
14-
/// Sender.
15-
pub type RpcSender<'a> = &'a mpsc::UnboundedSender<String>;
16-
/// RPC ID.
17-
pub type RpcId<'a> = Option<&'a RawValue>;
18-
/// Method registered in the server.
19-
pub type Method = Box<dyn Send + Sync + Fn(RpcId, RpcParams, RpcSender, ConnectionId) -> anyhow::Result<()>>;
20-
/// Methods registered in the Server.
21-
pub type Methods = FxHashMap<&'static str, Method>;
228

239
/// Helper for sending JSON-RPC responses to the client
24-
pub fn send_response(id: RpcId, tx: RpcSender, result: impl Serialize) {
10+
pub fn send_response(id: JsonRpcRawId, tx: &MethodSink, result: impl Serialize) {
2511
let json = match serde_json::to_string(&JsonRpcResponse { jsonrpc: TwoPointZero, id, result }) {
2612
Ok(json) => json,
2713
Err(err) => {
@@ -37,7 +23,7 @@ pub fn send_response(id: RpcId, tx: RpcSender, result: impl Serialize) {
3723
}
3824

3925
/// Helper for sending JSON-RPC errors to the client
40-
pub fn send_error(id: RpcId, tx: RpcSender, error: JsonRpcErrorObject) {
26+
pub fn send_error(id: JsonRpcRawId, tx: &MethodSink, error: JsonRpcErrorObject) {
4127
let json = match serde_json::to_string(&JsonRpcError { jsonrpc: TwoPointZero, error, id }) {
4228
Ok(json) => json,
4329
Err(err) => {

utils/src/server/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
//! Shared modules for the JSON-RPC servers.
2+
3+
/// Helpers.
4+
pub mod helpers;
5+
/// JSON-RPC "modules" group sets of methods that belong together and handles method/subscription registration.
6+
pub mod rpc_module;

ws-server/src/server/module.rs renamed to utils/src/server/rpc_module.rs

Lines changed: 76 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,36 @@
1-
use crate::server::{RpcParams, SubscriptionId, SubscriptionSink};
2-
use jsonrpsee_types::{
3-
error::{CallError, Error},
4-
v2::error::{JsonRpcErrorCode, JsonRpcErrorObject},
5-
};
6-
use jsonrpsee_types::{traits::RpcMethod, v2::error::CALL_EXECUTION_FAILED_CODE};
7-
use jsonrpsee_utils::server::{send_error, send_response, Methods};
1+
use crate::server::helpers::{send_error, send_response};
2+
use futures_channel::mpsc;
3+
use jsonrpsee_types::error::{CallError, Error};
4+
use jsonrpsee_types::traits::RpcMethod;
5+
use jsonrpsee_types::v2::error::{JsonRpcErrorCode, JsonRpcErrorObject, CALL_EXECUTION_FAILED_CODE};
6+
use jsonrpsee_types::v2::params::{JsonRpcNotificationParams, JsonRpcRawId, RpcParams, TwoPointZero};
7+
use jsonrpsee_types::v2::request::JsonRpcNotification;
8+
89
use parking_lot::Mutex;
910
use rustc_hash::FxHashMap;
1011
use serde::Serialize;
12+
use serde_json::value::to_raw_value;
1113
use std::sync::Arc;
1214

15+
/// A `Method` is an RPC endpoint, callable with a standard JSON-RPC request,
16+
/// implemented as a function pointer to a `Fn` function taking four arguments:
17+
/// the `id`, `params`, a channel the function uses to communicate the result (or error)
18+
/// back to `jsonrpsee`, and the connection ID (useful for the websocket transport).
19+
pub type Method = Box<dyn Send + Sync + Fn(JsonRpcRawId, RpcParams, &MethodSink, ConnectionId) -> anyhow::Result<()>>;
20+
/// A collection of registered [`Method`]s.
21+
pub type Methods = FxHashMap<&'static str, Method>;
22+
/// Connection ID, used for stateful protocol such as WebSockets.
23+
/// For stateless protocols such as http it's unused, so feel free to set it some hardcoded value.
24+
pub type ConnectionId = usize;
25+
/// Subscription ID.
26+
pub type SubscriptionId = u64;
27+
/// Sink that is used to send back the result to the server for a specific method.
28+
pub type MethodSink = mpsc::UnboundedSender<String>;
29+
30+
type Subscribers = Arc<Mutex<FxHashMap<(ConnectionId, SubscriptionId), MethodSink>>>;
31+
32+
/// Sets of JSON-RPC methods can be organized into a "module" that are in turn registered on server or,
33+
/// alternatively, merged with other modules to construct a cohesive API.
1334
#[derive(Default)]
1435
pub struct RpcModule {
1536
methods: Methods,
@@ -122,11 +143,14 @@ impl RpcModule {
122143
Ok(SubscriptionSink { method: subscribe_method_name, subscribers })
123144
}
124145

125-
pub(crate) fn into_methods(self) -> Methods {
146+
/// Convert a module into methods.
147+
pub fn into_methods(self) -> Methods {
126148
self.methods
127149
}
128150

129-
pub(crate) fn merge(&mut self, other: RpcModule) -> Result<(), Error> {
151+
/// Merge two [`RpcModule`]'s by adding all [`Method`]s from `other` into `self`.
152+
/// Fails if any of the methods in `other` is present already.
153+
pub fn merge(&mut self, other: RpcModule) -> Result<(), Error> {
130154
for name in other.methods.keys() {
131155
self.verify_method_name(name)?;
132156
}
@@ -139,6 +163,8 @@ impl RpcModule {
139163
}
140164
}
141165

166+
/// Similar to [`RpcModule`] but wraps an additional context argument that can be used
167+
/// to access data during call execution.
142168
pub struct RpcContextModule<Context> {
143169
ctx: Arc<Context>,
144170
module: RpcModule,
@@ -188,3 +214,44 @@ impl<Context> RpcContextModule<Context> {
188214
self.module
189215
}
190216
}
217+
218+
/// Used by the server to send data back to subscribers.
219+
#[derive(Clone)]
220+
pub struct SubscriptionSink {
221+
method: &'static str,
222+
subscribers: Subscribers,
223+
}
224+
225+
impl SubscriptionSink {
226+
/// Send data back to subscribers.
227+
/// If a send fails (likely a broken connection) the subscriber is removed from the sink.
228+
/// O(n) in the number of subscribers.
229+
pub fn send<T>(&mut self, result: &T) -> anyhow::Result<()>
230+
where
231+
T: Serialize,
232+
{
233+
let result = to_raw_value(result)?;
234+
235+
let mut errored = Vec::new();
236+
let mut subs = self.subscribers.lock();
237+
238+
for ((conn_id, sub_id), sender) in subs.iter() {
239+
let msg = serde_json::to_string(&JsonRpcNotification {
240+
jsonrpc: TwoPointZero,
241+
method: self.method,
242+
params: JsonRpcNotificationParams { subscription: *sub_id, result: &*result },
243+
})?;
244+
245+
// Track broken connections
246+
if sender.unbounded_send(msg).is_err() {
247+
errored.push((*conn_id, *sub_id));
248+
}
249+
}
250+
251+
// Remove broken connections
252+
for entry in errored {
253+
subs.remove(&entry);
254+
}
255+
Ok(())
256+
}
257+
}

ws-server/Cargo.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@ futures-util = { version = "0.3", default-features = false, features = ["io"] }
1616
jsonrpsee-types = { path = "../types", version = "0.2.0-alpha.6" }
1717
jsonrpsee-utils = { path = "../utils", version = "0.2.0-alpha.6", features = ["server"] }
1818
log = "0.4"
19-
parking_lot = "0.11"
20-
rand = "0.8"
2119
rustc-hash = "1.1.0"
2220
serde = { version = "1", default-features = false, features = ["derive"] }
2321
serde_json = { version = "1", features = ["raw_value"] }

ws-server/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,5 @@ mod server;
3232
mod tests;
3333

3434
pub use jsonrpsee_types::error::Error;
35-
pub use server::{RpcContextModule, RpcModule, Server as WsServer, SubscriptionSink};
35+
pub use jsonrpsee_utils::server::rpc_module::{Methods, RpcContextModule, RpcModule, SubscriptionSink};
36+
pub use server::Server as WsServer;

0 commit comments

Comments
 (0)