From 4c2c2c025fe9302fa677422176af5082cba78d89 Mon Sep 17 00:00:00 2001 From: Nasr Date: Fri, 16 May 2025 14:18:47 +0800 Subject: [PATCH 1/2] feat(torii-transport): custom transport with retries & user agent --- Cargo.toml | 1 + crates/transport/Cargo.toml | 13 ++ crates/transport/src/error.rs | 17 +++ crates/transport/src/lib.rs | 269 ++++++++++++++++++++++++++++++++++ 4 files changed, 300 insertions(+) create mode 100644 crates/transport/Cargo.toml create mode 100644 crates/transport/src/error.rs create mode 100644 crates/transport/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 53ca3df0..de4fef9e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,6 +61,7 @@ torii-grpc-client = { path = "crates/grpc/client" } torii-grpc-server = { path = "crates/grpc/server" } torii-adigraphmap = { path = "crates/adigraphmap" } torii-task-network = { path = "crates/task-network" } +torii-transport = { path = "crates/transport" } # macros merge-options = { git = "https://github.com/dojoengine/dojo", rev = "82fe9bd" } diff --git a/crates/transport/Cargo.toml b/crates/transport/Cargo.toml new file mode 100644 index 00000000..c9defe4c --- /dev/null +++ b/crates/transport/Cargo.toml @@ -0,0 +1,13 @@ +[package] +edition.workspace = true +license.workspace = true +name = "torii-adigraphmap" +repository.workspace = true +version.workspace = true + +[dependencies] +thiserror.workspace = true +tracing.workspace = true +futures-util.workspace = true +tokio.workspace = true +reqwest.workspace = true diff --git a/crates/transport/src/error.rs b/crates/transport/src/error.rs new file mode 100644 index 00000000..a84da5eb --- /dev/null +++ b/crates/transport/src/error.rs @@ -0,0 +1,17 @@ +use thiserror::Error; + +/// Errors using [`HttpTransport`]. +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +pub enum HttpTransportError { + /// HTTP-related errors. + Reqwest(reqwest::Error), + /// JSON serialization/deserialization errors. + Json(serde_json::Error), + /// Unexpected response ID. + #[error("unexpected response ID: {0}")] + UnexpectedResponseId(u64), + /// Retries exhausted. + #[error("retries exhausted after {max_retries} attempts: {last_error}")] + RetriesExhausted { max_retries: u32, last_error: Box }, +} \ No newline at end of file diff --git a/crates/transport/src/lib.rs b/crates/transport/src/lib.rs new file mode 100644 index 00000000..7ddd6150 --- /dev/null +++ b/crates/transport/src/lib.rs @@ -0,0 +1,269 @@ +use async_trait::async_trait; +use log::trace; +use reqwest::{Client, Url}; +use serde::{de::DeserializeOwned, Serialize}; +use tokio::time; + +use crate::{ + jsonrpc::{transports::JsonRpcTransport, JsonRpcMethod, JsonRpcResponse}, + ProviderRequestData, +}; + +/// A [`JsonRpcTransport`] implementation that uses HTTP connections. +#[derive(Debug, Clone)] +pub struct HttpTransport { + client: Client, + url: Url, + headers: Vec<(String, String)>, + max_retries: u32, + retry_delay_ms: u64, +} + +#[derive(Debug, Serialize)] +struct JsonRpcRequest { + id: u64, + jsonrpc: &'static str, + method: JsonRpcMethod, + params: T, +} + +impl HttpTransport { + /// Constructs [`HttpTransport`] from a JSON-RPC server URL, using default HTTP client settings. + /// Defaults to 3 retries with a 500ms base delay. + /// + /// To use custom HTTP settings (e.g. proxy, timeout), use + /// [`new_with_client`](fn.new_with_client) instead. + pub fn new(url: impl Into) -> Self { + Self::new_with_client(url, Client::new()) + } + + /// Constructs [`HttpTransport`] from a JSON-RPC server URL and a custom `reqwest` client. + /// Defaults to 3 retries with a 500ms base delay. + pub fn new_with_client(url: impl Into, client: Client) -> Self { + Self { + client, + url: url.into(), + headers: vec![], + max_retries: 3, // Default max retries + retry_delay_ms: 500, // Default base delay in ms + } + } + + /// Sets the maximum number of retries for requests. + pub fn with_max_retries(mut self, max_retries: u32) -> Self { + self.max_retries = max_retries; + self + } + + /// Sets the base delay in milliseconds for exponential backoff. + pub fn with_retry_delay_ms(mut self, retry_delay_ms: u64) -> Self { + self.retry_delay_ms = retry_delay_ms; + self + } + + /// Consumes the current [`HttpTransport`] instance and returns a new one with the header + /// appended. Same as calling [`add_header`](fn.add_header). + pub fn with_header(self, name: String, value: String) -> Self { + let mut headers = self.headers; + headers.push((name, value)); + + Self { + client: self.client, + url: self.url, + headers, + max_retries: self.max_retries, + retry_delay_ms: self.retry_delay_ms, + } + } + + /// Adds a custom HTTP header to be sent for requests. + pub fn add_header(&mut self, name: String, value: String) { + self.headers.push((name, value)) + } +} + +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl JsonRpcTransport for HttpTransport { + type Error = HttpTransportError; + + async fn send_request( + &self, + method: JsonRpcMethod, + params: P, + ) -> Result, Self::Error> + where + P: Serialize + Send, + R: DeserializeOwned, + { + let mut attempts = 0; + let mut last_error: Option = None; + + while attempts <= self.max_retries { + let request_body_data = JsonRpcRequest { + id: 1, + jsonrpc: "2.0", + method, + params: ¶ms, + }; + + let request_body_json = match serde_json::to_string(&request_body_data) { + Ok(json) => json, + Err(e) => return Err(HttpTransportError::Json(e)), + }; + + trace!("Sending request via JSON-RPC (attempt {}): {}", attempts + 1, request_body_json); + + let mut request_builder = self + .client + .post(self.url.clone()) + .body(request_body_json.clone()) + .header("Content-Type", "application/json"); + + for (name, value) in &self.headers { + request_builder = request_builder.header(name, value); + } + + match request_builder.send().await { + Ok(response) => { + let response_text = match response.text().await { + Ok(text) => text, + Err(e) => { + last_error = Some(HttpTransportError::Reqwest(e)); + attempts += 1; + if attempts <= self.max_retries && self.retry_delay_ms > 0 { + let delay = self.retry_delay_ms * (2u64.pow(attempts -1)); + time::sleep(time::Duration::from_millis(delay)).await; + } + continue; + } + }; + trace!("Response from JSON-RPC: {}", response_text); + match serde_json::from_str(&response_text) { + Ok(parsed) => return Ok(parsed), + Err(e) => return Err(HttpTransportError::Json(e)), + } + } + Err(e) => { + last_error = Some(HttpTransportError::Reqwest(e)); + attempts += 1; + if attempts <= self.max_retries && self.retry_delay_ms > 0 { + let delay = self.retry_delay_ms * (2u64.pow(attempts - 1)); + time::sleep(time::Duration::from_millis(delay)).await; + } + } + } + } + Err(HttpTransportError::RetriesExhausted { + max_retries: self.max_retries, + last_error: Box::new(last_error.unwrap_or_else(|| HttpTransportError::Reqwest(reqwest::Error::from(std::io::Error::new(std::io::ErrorKind::Other, "Unknown error during request processing"))))) + }) + } + + async fn send_requests( + &self, + requests_data: R, + ) -> Result>, Self::Error> + where + R: AsRef<[ProviderRequestData]> + Send + Sync, + { + let mut attempts = 0; + let mut last_error: Option = None; + + let original_request_bodies: Vec<_> = requests_data + .as_ref() + .iter() + .enumerate() + .map(|(ind, request_item)| JsonRpcRequest { + id: ind as u64, + jsonrpc: "2.0", + method: request_item.jsonrpc_method(), + params: request_item, + }) + .collect(); + + let request_count = original_request_bodies.len(); + + while attempts <= self.max_retries { + let request_body_json = match serde_json::to_string(&original_request_bodies) { + Ok(json) => json, + Err(e) => return Err(HttpTransportError::Json(e)), + }; + trace!("Sending batch request via JSON-RPC (attempt {}): {}", attempts + 1, request_body_json); + + let mut request_builder = self + .client + .post(self.url.clone()) + .body(request_body_json) + .header("Content-Type", "application/json"); + + for (name, value) in &self.headers { + request_builder = request_builder.header(name, value); + } + + match request_builder.send().await { + Ok(response) => { + let response_text = match response.text().await { + Ok(text) => text, + Err(e) => { + last_error = Some(HttpTransportError::Reqwest(e)); + attempts += 1; + if attempts <= self.max_retries && self.retry_delay_ms > 0 { + let delay = self.retry_delay_ms * (2u64.pow(attempts -1)); + time::sleep(time::Duration::from_millis(delay)).await; + } + continue; + } + }; + trace!("Response from JSON-RPC: {}", response_text); + + let parsed_response_batch: Vec> = + match serde_json::from_str(&response_text) { + Ok(parsed) => parsed, + Err(e) => return Err(HttpTransportError::Json(e)), + }; + + let mut responses_ordered: Vec>> = vec![]; + responses_ordered.resize(request_count, None); + + for response_item in parsed_response_batch { + let id = match &response_item { + JsonRpcResponse::Success { id, .. } | JsonRpcResponse::Error { id, .. } => { + *id as usize + } + }; + + if id >= request_count { + return Err(HttpTransportError::UnexpectedResponseId(id as u64)); + } + responses_ordered[id] = Some(response_item); + } + + if responses_ordered.iter().any(Option::is_none) { + last_error = Some(HttpTransportError::UnexpectedResponseId(request_count as u64)); + attempts += 1; + if attempts <= self.max_retries && self.retry_delay_ms > 0 { + let delay = self.retry_delay_ms * (2u64.pow(attempts -1)); + time::sleep(time::Duration::from_millis(delay)).await; + } + continue; + } + + return Ok(responses_ordered.into_iter().flatten().collect::>()); + } + Err(e) => { + last_error = Some(HttpTransportError::Reqwest(e)); + attempts += 1; + if attempts <= self.max_retries && self.retry_delay_ms > 0 { + let delay = self.retry_delay_ms * (2u64.pow(attempts-1)); + time::sleep(time::Duration::from_millis(delay)).await; + } + } + } + } + Err(HttpTransportError::RetriesExhausted { + max_retries: self.max_retries, + last_error: Box::new(last_error.unwrap_or_else(|| HttpTransportError::Reqwest(reqwest::Error::from(std::io::Error::new(std::io::ErrorKind::Other, "Unknown error during batch request processing"))))) + }) + } +} From f74f7447472f70fa03bda6673e04d797d3e6aef7 Mon Sep 17 00:00:00 2001 From: Nasr Date: Fri, 16 May 2025 15:03:33 +0800 Subject: [PATCH 2/2] jsonrpc trnaport --- Cargo.lock | 15 +++++++++++++++ crates/runner/Cargo.toml | 1 + crates/transport/Cargo.toml | 7 +++++-- crates/transport/src/error.rs | 2 -- crates/transport/src/lib.rs | 10 +++++++--- 5 files changed, 28 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d4c0388c..083c1d1a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11501,6 +11501,7 @@ dependencies = [ "torii-processors", "torii-server", "torii-sqlite", + "torii-transport", "tower", "tower-http", "tracing", @@ -11613,6 +11614,20 @@ dependencies = [ "tracing", ] +[[package]] +name = "torii-transport" +version = "1.5.4-preview.0" +dependencies = [ + "async-trait", + "futures-util", + "reqwest", + "serde", + "serde_json", + "starknet 0.12.0", + "tokio", + "tracing", +] + [[package]] name = "torii-typed-data" version = "1.5.4-preview.0" diff --git a/crates/runner/Cargo.toml b/crates/runner/Cargo.toml index 3720334d..bf04d86f 100644 --- a/crates/runner/Cargo.toml +++ b/crates/runner/Cargo.toml @@ -46,6 +46,7 @@ torii-libp2p-relay = { workspace = true } torii-server.workspace = true torii-processors.workspace = true tower.workspace = true +torii-transport.workspace = true tempfile.workspace = true tower-http.workspace = true diff --git a/crates/transport/Cargo.toml b/crates/transport/Cargo.toml index c9defe4c..e9323107 100644 --- a/crates/transport/Cargo.toml +++ b/crates/transport/Cargo.toml @@ -1,13 +1,16 @@ [package] edition.workspace = true license.workspace = true -name = "torii-adigraphmap" +name = "torii-transport" repository.workspace = true version.workspace = true [dependencies] -thiserror.workspace = true tracing.workspace = true futures-util.workspace = true tokio.workspace = true reqwest.workspace = true +serde_json.workspace = true +starknet.workspace = true +serde.workspace = true +async-trait.workspace = true diff --git a/crates/transport/src/error.rs b/crates/transport/src/error.rs index a84da5eb..cb27c773 100644 --- a/crates/transport/src/error.rs +++ b/crates/transport/src/error.rs @@ -1,5 +1,3 @@ -use thiserror::Error; - /// Errors using [`HttpTransport`]. #[derive(Debug, thiserror::Error)] #[error(transparent)] diff --git a/crates/transport/src/lib.rs b/crates/transport/src/lib.rs index 7ddd6150..ac8bb372 100644 --- a/crates/transport/src/lib.rs +++ b/crates/transport/src/lib.rs @@ -1,14 +1,18 @@ use async_trait::async_trait; -use log::trace; +use tracing::trace; use reqwest::{Client, Url}; use serde::{de::DeserializeOwned, Serialize}; use tokio::time; -use crate::{ - jsonrpc::{transports::JsonRpcTransport, JsonRpcMethod, JsonRpcResponse}, +use starknet::providers::{ + jsonrpc::{JsonRpcTransport, JsonRpcMethod, JsonRpcResponse}, ProviderRequestData, }; +pub mod error; + +pub use error::HttpTransportError; + /// A [`JsonRpcTransport`] implementation that uses HTTP connections. #[derive(Debug, Clone)] pub struct HttpTransport {