diff --git a/Cargo.lock b/Cargo.lock index 69ece44..398dc64 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1617,7 +1617,7 @@ dependencies = [ [[package]] name = "hyperware_process_lib" -version = "2.1.0" +version = "2.2.0" dependencies = [ "alloy", "alloy-primitives", @@ -1627,7 +1627,7 @@ dependencies = [ "base64", "bincode", "color-eyre", - "hex", + "futures-util", "http", "mime_guess", "rand 0.8.5", @@ -1635,12 +1635,12 @@ dependencies = [ "rmp-serde", "serde", "serde_json", - "sha3", "thiserror 1.0.69", "tracing", "tracing-error", "tracing-subscriber", "url", + "uuid", "wit-bindgen", ] @@ -3436,6 +3436,17 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" +[[package]] +name = "uuid" +version = "1.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f33196643e165781c20a5ead5582283a7dacbb87855d867fbc2df3f81eddc1be" +dependencies = [ + "getrandom 0.3.3", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "valuable" version = "0.1.1" diff --git a/Cargo.toml b/Cargo.toml index 73bac30..7e15bf2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "hyperware_process_lib" authors = ["Sybil Technologies AG"] -version = "2.1.0" +version = "2.2.0" edition = "2021" description = "A library for writing Hyperware processes in Rust." homepage = "https://hyperware.ai" @@ -9,6 +9,7 @@ repository = "https://github.com/hyperware-ai/process_lib" license = "Apache-2.0" [features] +hyperapp = ["dep:futures-util", "dep:uuid", "logging"] logging = ["dep:color-eyre", "dep:tracing", "dep:tracing-error", "dep:tracing-subscriber"] hyperwallet = [] simulation-mode = [] @@ -29,8 +30,6 @@ alloy = { version = "0.8.1", features = [ anyhow = "1.0" base64 = "0.22.1" bincode = "1.3.3" -color-eyre = { version = "0.6", features = ["capture-spantrace"], optional = true } -hex = "0.4.3" http = "1.0.0" mime_guess = "2.0" rand = "0.8" @@ -38,10 +37,14 @@ regex = "1.11.1" rmp-serde = "1.1.2" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0.120" -sha3 = "0.10.8" thiserror = "1.0" +url = "2.4.1" +wit-bindgen = "0.42.1" + +futures-util = { version = "0.3", optional = true } +uuid = { version = "1.0", features = ["v4"], optional = true } + +color-eyre = { version = "0.6", features = ["capture-spantrace"], optional = true } tracing = { version = "0.1", optional = true } tracing-error = { version = "0.2", optional = true } tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "std"], optional = true } -url = "2.4.1" -wit-bindgen = "0.42.1" diff --git a/src/http/client.rs b/src/http/client.rs index bf06b8c..9065c43 100644 --- a/src/http/client.rs +++ b/src/http/client.rs @@ -1,11 +1,16 @@ pub use super::server::{HttpResponse, WsMessageType}; -use crate::{get_blob, LazyLoadBlob as KiBlob, Message, Request as KiRequest}; +#[cfg(not(feature = "hyperapp"))] +use crate::Message; +use crate::{get_blob, LazyLoadBlob as KiBlob, Request as KiRequest}; use http::Method; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::str::FromStr; use thiserror::Error; +#[cfg(feature = "hyperapp")] +use crate::hyperapp; + /// [`crate::Request`] type sent to the `http-client:distro:sys` service in order to open a /// WebSocket connection, send a WebSocket message on an existing connection, or /// send an HTTP request. @@ -131,6 +136,7 @@ pub fn send_request( /// Make an HTTP request using http-client and await its response. /// /// Returns HTTP response from the `http` crate if successful, with the body type as bytes. +#[cfg(not(feature = "hyperapp"))] pub fn send_request_await_response( method: Method, url: url::Url, @@ -190,6 +196,64 @@ pub fn send_request_await_response( .unwrap()) } +/// Make an HTTP request using http-client and await its response. +/// +/// Returns HTTP response from the `http` crate if successful, with the body type as bytes. +#[cfg(feature = "hyperapp")] +pub async fn send_request_await_response( + method: Method, + url: url::Url, + headers: Option>, + timeout: u64, + body: Vec, +) -> std::result::Result>, HttpClientError> { + let request = KiRequest::to(("our", "http-client", "distro", "sys")) + .body( + serde_json::to_vec(&HttpClientAction::Http(OutgoingHttpRequest { + method: method.to_string(), + version: None, + url: url.to_string(), + headers: headers.unwrap_or_default(), + })) + .map_err(|_| HttpClientError::MalformedRequest)?, + ) + .blob_bytes(body) + .expects_response(timeout); + + let resp_result = + hyperapp::send::>(request) + .await + .map_err(|_| { + HttpClientError::ExecuteRequestFailed("http-client timed out".to_string()) + })?; + + let resp = match resp_result { + Ok(HttpClientResponse::Http(resp)) => resp, + Ok(HttpClientResponse::WebSocketAck) => { + return Err(HttpClientError::ExecuteRequestFailed( + "http-client gave unexpected response".to_string(), + )) + } + Err(e) => return Err(e), + }; + let mut http_response = http::Response::builder() + .status(http::StatusCode::from_u16(resp.status).unwrap_or_default()); + let headers = http_response.headers_mut().unwrap(); + for (key, value) in &resp.headers { + let Ok(key) = http::header::HeaderName::from_str(key) else { + continue; + }; + let Ok(value) = http::header::HeaderValue::from_str(value) else { + continue; + }; + headers.insert(key, value); + } + Ok(http_response + .body(get_blob().unwrap_or_default().bytes) + .unwrap()) +} + +#[cfg(not(feature = "hyperapp"))] pub fn open_ws_connection( url: String, headers: Option>, @@ -231,7 +295,37 @@ pub fn send_ws_client_push(channel_id: u32, message_type: WsMessageType, blob: K .unwrap() } +#[cfg(feature = "hyperapp")] +pub async fn open_ws_connection( + url: String, + headers: Option>, + channel_id: u32, +) -> std::result::Result<(), HttpClientError> { + let request = KiRequest::to(("our", "http-client", "distro", "sys")) + .body( + serde_json::to_vec(&HttpClientAction::WebSocketOpen { + url: url.clone(), + headers: headers.unwrap_or(HashMap::new()), + channel_id, + }) + .unwrap(), + ) + .expects_response(5); + + let resp_result = + hyperapp::send::>(request) + .await + .map_err(|_| HttpClientError::WsOpenFailed { url: url.clone() })?; + + match resp_result { + Ok(HttpClientResponse::WebSocketAck) => Ok(()), + Err(e) => Err(e), + _ => Err(HttpClientError::WsOpenFailed { url }), + } +} + /// Close a WebSocket connection. +#[cfg(not(feature = "hyperapp"))] pub fn close_ws_connection(channel_id: u32) -> std::result::Result<(), HttpClientError> { let Ok(Ok(Message::Response { body, .. })) = KiRequest::to(("our", "http-client", "distro", "sys")) @@ -251,3 +345,27 @@ pub fn close_ws_connection(channel_id: u32) -> std::result::Result<(), HttpClien _ => Err(HttpClientError::WsCloseFailed { channel_id }), } } + +/// Close a WebSocket connection. +#[cfg(feature = "hyperapp")] +pub async fn close_ws_connection(channel_id: u32) -> std::result::Result<(), HttpClientError> { + let request = KiRequest::to(("our", "http-client", "distro", "sys")) + .body( + serde_json::json!(HttpClientAction::WebSocketClose { channel_id }) + .to_string() + .as_bytes() + .to_vec(), + ) + .expects_response(5); + + let resp_result = + hyperapp::send::>(request) + .await + .map_err(|_| HttpClientError::WsCloseFailed { channel_id })?; + + match resp_result { + Ok(HttpClientResponse::WebSocketAck) => Ok(()), + Err(e) => Err(e), + _ => Err(HttpClientError::WsCloseFailed { channel_id }), + } +} diff --git a/src/hyperapp.rs b/src/hyperapp.rs new file mode 100644 index 0000000..7cd0439 --- /dev/null +++ b/src/hyperapp.rs @@ -0,0 +1,546 @@ +use std::cell::RefCell; +use std::collections::HashMap; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use crate::{ + get_state, http, + http::server::{HttpBindingConfig, HttpServer, IncomingHttpRequest, WsBindingConfig}, + logging::{error, info}, + set_state, timer, Address, BuildError, LazyLoadBlob, Message, Request, SendError, +}; +use futures_util::task::noop_waker_ref; +use serde::{Deserialize, Serialize}; +use thiserror::Error; +use uuid::Uuid; + +// macro_export puts it in the root, +// so we re-export here so you can use as either +// hyperware_process_lib::run_async +// or +// hyperware_process_lib::hyperapp::run_async +pub use crate::run_async; + +thread_local! { + pub static APP_CONTEXT: RefCell = RefCell::new(AppContext { + hidden_state: None, + executor: Executor::new(), + }); + + pub static RESPONSE_REGISTRY: RefCell>> = RefCell::new(HashMap::new()); + + pub static APP_HELPERS: RefCell = RefCell::new(AppHelpers { + current_server: None, + current_message: None, + current_http_context: None, + }); +} + +#[derive(Clone)] +pub struct HttpRequestContext { + pub request: IncomingHttpRequest, + pub response_headers: HashMap, +} + +pub struct AppContext { + pub hidden_state: Option, + pub executor: Executor, +} + +pub struct AppHelpers { + pub current_server: Option<*mut HttpServer>, + pub current_message: Option, + pub current_http_context: Option, +} + +// Access function for the current path +pub fn get_path() -> Option { + APP_HELPERS.with(|helpers| { + helpers + .borrow() + .current_http_context + .as_ref() + .and_then(|ctx| ctx.request.path().ok()) + }) +} + +// Access function for the current server +pub fn get_server() -> Option<&'static mut HttpServer> { + APP_HELPERS.with(|ctx| ctx.borrow().current_server.map(|ptr| unsafe { &mut *ptr })) +} + +pub fn get_http_method() -> Option { + APP_HELPERS.with(|helpers| { + helpers + .borrow() + .current_http_context + .as_ref() + .and_then(|ctx| ctx.request.method().ok()) + .map(|m| m.to_string()) + }) +} + +// Set response headers that will be included in the HTTP response +pub fn set_response_headers(headers: HashMap) { + APP_HELPERS.with(|helpers| { + if let Some(ctx) = &mut helpers.borrow_mut().current_http_context { + ctx.response_headers = headers; + } + }) +} + +// Add a single response header +pub fn add_response_header(key: String, value: String) { + APP_HELPERS.with(|helpers| { + if let Some(ctx) = &mut helpers.borrow_mut().current_http_context { + ctx.response_headers.insert(key, value); + } + }) +} + +pub fn clear_http_request_context() { + APP_HELPERS.with(|helpers| { + helpers.borrow_mut().current_http_context = None; + }) +} + +// Access function for the source address of the current message +pub fn source() -> Address { + APP_HELPERS.with(|ctx| { + ctx.borrow() + .current_message + .as_ref() + .expect("No message in current context") + .source() + .clone() + }) +} + +/// Get query parameters from the current HTTP request path +/// Returns None if not in an HTTP context or no query parameters present +pub fn get_query_params() -> Option> { + get_path().map(|path| { + let mut params = HashMap::new(); + if let Some(query_start) = path.find('?') { + let query = &path[query_start + 1..]; + for pair in query.split('&') { + if let Some(eq_pos) = pair.find('=') { + let key = pair[..eq_pos].to_string(); + let value = pair[eq_pos + 1..].to_string(); + params.insert(key, value); + } + } + } + params + }) +} + +pub struct Executor { + tasks: Vec>>>, +} + +impl Executor { + pub fn new() -> Self { + Self { tasks: Vec::new() } + } + + pub fn spawn(&mut self, fut: impl Future + 'static) { + self.tasks.push(Box::pin(fut)); + } + + pub fn poll_all_tasks(&mut self) { + let mut ctx = Context::from_waker(noop_waker_ref()); + let mut completed = Vec::new(); + + for i in 0..self.tasks.len() { + if let Poll::Ready(()) = self.tasks[i].as_mut().poll(&mut ctx) { + completed.push(i); + } + } + + for idx in completed.into_iter().rev() { + let _ = self.tasks.remove(idx); + } + } +} +struct ResponseFuture { + correlation_id: String, + // Capture HTTP context at creation time + http_context: Option, +} + +impl ResponseFuture { + fn new(correlation_id: String) -> Self { + // Capture current HTTP context when future is created (at .await point) + let http_context = + APP_HELPERS.with(|helpers| helpers.borrow().current_http_context.clone()); + + Self { + correlation_id, + http_context, + } + } +} + +impl Future for ResponseFuture { + type Output = Vec; + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + let correlation_id = &self.correlation_id; + + let maybe_bytes = RESPONSE_REGISTRY.with(|registry| { + let mut registry_mut = registry.borrow_mut(); + registry_mut.remove(correlation_id) + }); + + if let Some(bytes) = maybe_bytes { + // Restore this future's captured context + if let Some(ref context) = self.http_context { + APP_HELPERS.with(|helpers| { + helpers.borrow_mut().current_http_context = Some(context.clone()); + }); + } + + Poll::Ready(bytes) + } else { + Poll::Pending + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, Error)] +pub enum AppSendError { + #[error("SendError: {0}")] + SendError(SendError), + #[error("BuildError: {0}")] + BuildError(BuildError), +} + +pub async fn sleep(sleep_ms: u64) -> Result<(), AppSendError> { + let request = Request::to(("our", "timer", "distro", "sys")) + .body(timer::TimerAction::SetTimer(sleep_ms)) + .expects_response((sleep_ms / 1_000) + 1); + + let correlation_id = Uuid::new_v4().to_string(); + if let Err(e) = request.context(correlation_id.as_bytes().to_vec()).send() { + return Err(AppSendError::BuildError(e)); + } + + let _ = ResponseFuture::new(correlation_id).await; + + return Ok(()); +} + +pub async fn send(request: Request) -> Result +where + R: serde::de::DeserializeOwned, +{ + let request = if request.timeout.is_some() { + request + } else { + request.expects_response(30) + }; + + let correlation_id = Uuid::new_v4().to_string(); + if let Err(e) = request.context(correlation_id.as_bytes().to_vec()).send() { + return Err(AppSendError::BuildError(e)); + } + + let response_bytes = ResponseFuture::new(correlation_id).await; + if let Ok(r) = serde_json::from_slice::(&response_bytes) { + return Ok(r); + } + + let e = serde_json::from_slice::(&response_bytes) + .expect("Failed to deserialize response to send()"); + return Err(AppSendError::SendError(e)); +} + +pub async fn send_rmp(request: Request) -> Result +where + R: serde::de::DeserializeOwned, +{ + let request = if request.timeout.is_some() { + request + } else { + request.expects_response(30) + }; + + let correlation_id = Uuid::new_v4().to_string(); + if let Err(e) = request.context(correlation_id.as_bytes().to_vec()).send() { + return Err(AppSendError::BuildError(e)); + } + + let response_bytes = ResponseFuture::new(correlation_id).await; + if let Ok(r) = rmp_serde::from_slice::(&response_bytes) { + return Ok(r); + } + + let e = rmp_serde::from_slice::(&response_bytes) + .expect("Failed to deserialize response to send()"); + return Err(AppSendError::SendError(e)); +} + +#[macro_export] +macro_rules! run_async { + ($($code:tt)*) => { + hyperware_process_lib::hyperapp::APP_CONTEXT.with(|ctx| { + ctx.borrow_mut().executor.spawn(async move { + $($code)* + }) + }) + }; +} + +// Enum defining the state persistance behaviour +#[derive(Clone)] +pub enum SaveOptions { + // Never Persist State + Never, + // Persist State Every Message + EveryMessage, + // Persist State Every N Messages + EveryNMessage(u64), + // Persist State Every N Seconds + EveryNSeconds(u64), + // Persist State Only If Changed + OnDiff, +} +pub struct HiddenState { + save_config: SaveOptions, + message_count: u64, + old_state: Option>, // Stores the serialized state from before message processing +} + +impl HiddenState { + pub fn new(save_config: SaveOptions) -> Self { + Self { + save_config, + message_count: 0, + old_state: None, + } + } + + fn should_save_state(&mut self) -> bool { + match self.save_config { + SaveOptions::Never => false, + SaveOptions::EveryMessage => true, + SaveOptions::EveryNMessage(n) => { + self.message_count += 1; + if self.message_count >= n { + self.message_count = 0; + true + } else { + false + } + } + SaveOptions::EveryNSeconds(_) => false, // Handled by timer instead + SaveOptions::OnDiff => false, // Will be handled separately with state comparison + } + } +} + +// TODO: We need a timer macro again. + +/// Store a snapshot of the current state before processing a message +/// This is used for OnDiff save option to compare state before and after +/// Only stores if old_state is None (i.e., first time or after a save) +pub fn store_old_state(state: &S) +where + S: serde::Serialize, +{ + APP_CONTEXT.with(|ctx| { + let mut ctx_mut = ctx.borrow_mut(); + if let Some(ref mut hidden_state) = ctx_mut.hidden_state { + if matches!(hidden_state.save_config, SaveOptions::OnDiff) + && hidden_state.old_state.is_none() + { + if let Ok(s_bytes) = rmp_serde::to_vec(state) { + hidden_state.old_state = Some(s_bytes); + } + } + } + }); +} + +/// Trait that must be implemented by application state types +pub trait State { + /// Creates a new instance of the state. + fn new() -> Self; +} + +/// Initialize state from persisted storage or create new if none exists +/// TODO: Delete? +pub fn initialize_state() -> S +where + S: serde::de::DeserializeOwned + Default, +{ + match get_state() { + Some(bytes) => match rmp_serde::from_slice::(&bytes) { + Ok(state) => state, + Err(e) => { + panic!("error deserializing existing state: {e}. We're panicking because we don't want to nuke state by setting it to a new instance."); + } + }, + None => { + info!("no existing state, creating new one"); + S::default() + } + } +} + +pub fn setup_server( + ui_config: Option<&HttpBindingConfig>, + endpoints: &[Binding], +) -> http::server::HttpServer { + let mut server = http::server::HttpServer::new(5); + + if let Some(ui) = ui_config { + if let Err(e) = server.serve_ui("ui", vec!["/"], ui.clone()) { + panic!("failed to serve UI: {e}. Make sure that a ui folder is in /pkg"); + } + } + + // Verify no duplicate paths + let mut seen_paths = std::collections::HashSet::new(); + for endpoint in endpoints.iter() { + let path = match endpoint { + Binding::Http { path, .. } => path, + Binding::Ws { path, .. } => path, + }; + if !seen_paths.insert(path) { + panic!("duplicate path found: {}", path); + } + } + + for endpoint in endpoints { + match endpoint { + Binding::Http { path, config } => { + server + .bind_http_path(path.to_string(), config.clone()) + .expect("failed to serve API path"); + } + Binding::Ws { path, config } => { + server + .bind_ws_path(path.to_string(), config.clone()) + .expect("failed to bind WS path"); + } + } + } + + server +} + +/// Pretty prints a SendError in a more readable format +pub fn pretty_print_send_error(error: &SendError) { + let kind = &error.kind; + let target = &error.target; + + // Try to decode body as UTF-8 string, fall back to showing as bytes + let body = String::from_utf8(error.message.body().to_vec()) + .map(|s| format!("\"{}\"", s)) + .unwrap_or_else(|_| format!("{:?}", error.message.body())); + + // Try to decode context as UTF-8 string + let context = error + .context + .as_ref() + .map(|bytes| String::from_utf8_lossy(bytes).into_owned()); + + error!( + "SendError {{ + kind: {:?}, + target: {}, + body: {}, + context: {} +}}", + kind, + target, + body, + context + .map(|s| format!("\"{}\"", s)) + .unwrap_or("None".to_string()) + ); +} + +// For demonstration, we'll define them all in one place. +// Make sure the signatures match the real function signatures you require! +pub fn no_init_fn(_state: &mut S) { + // does nothing +} + +pub fn no_ws_handler( + _state: &mut S, + _server: &mut http::server::HttpServer, + _channel_id: u32, + _msg_type: http::server::WsMessageType, + _blob: LazyLoadBlob, +) { + // does nothing +} + +pub fn no_http_api_call(_state: &mut S, _req: ()) { + // does nothing +} + +pub fn no_local_request(_msg: &Message, _state: &mut S, _req: ()) { + // does nothing +} + +pub fn no_remote_request(_msg: &Message, _state: &mut S, _req: ()) { + // does nothing +} + +#[derive(Clone, Debug)] +pub enum Binding { + Http { + path: &'static str, + config: HttpBindingConfig, + }, + Ws { + path: &'static str, + config: WsBindingConfig, + }, +} + +pub fn maybe_save_state(state: &S) +where + S: serde::Serialize, +{ + APP_CONTEXT.with(|ctx| { + let mut ctx_mut = ctx.borrow_mut(); + if let Some(ref mut hidden_state) = ctx_mut.hidden_state { + let should_save = if matches!(hidden_state.save_config, SaveOptions::OnDiff) { + // For OnDiff, compare current state with old state + if let Ok(current_bytes) = rmp_serde::to_vec(state) { + let state_changed = match &hidden_state.old_state { + Some(old_bytes) => old_bytes != ¤t_bytes, + None => true, // If no old state, consider it changed + }; + + if state_changed { + true + } else { + false + } + } else { + false + } + } else { + hidden_state.should_save_state() + }; + + if should_save { + if let Ok(s_bytes) = rmp_serde::to_vec(state) { + let _ = set_state(&s_bytes); + + // Clear old_state after saving so it can be set again on next message + if matches!(hidden_state.save_config, SaveOptions::OnDiff) { + hidden_state.old_state = None; + } + } + } + } + }); +} diff --git a/src/hypermap.rs b/src/hypermap.rs index 5e7b0af..5bf4aa5 100644 --- a/src/hypermap.rs +++ b/src/hypermap.rs @@ -372,6 +372,7 @@ pub fn namehash(name: &str) -> String { /// Decode a mint log from the hypermap into a 'resolved' format. /// /// Uses [`valid_name()`] to check if the name is valid. +#[cfg(not(feature = "hyperapp"))] pub fn decode_mint_log(log: &crate::eth::Log) -> Result { let contract::Note::SIGNATURE_HASH = log.topics()[0] else { return Err(DecodeLogError::UnexpectedTopic(log.topics()[0])); @@ -388,9 +389,30 @@ pub fn decode_mint_log(log: &crate::eth::Log) -> Result { } } +/// Decode a mint log from the hypermap into a 'resolved' format. +/// +/// Uses [`valid_name()`] to check if the name is valid. +#[cfg(feature = "hyperapp")] +pub async fn decode_mint_log(log: &crate::eth::Log) -> Result { + let contract::Note::SIGNATURE_HASH = log.topics()[0] else { + return Err(DecodeLogError::UnexpectedTopic(log.topics()[0])); + }; + let decoded = contract::Mint::decode_log_data(log.data(), true) + .map_err(|e| DecodeLogError::DecodeError(e.to_string()))?; + let name = String::from_utf8_lossy(&decoded.label).to_string(); + if !valid_name(&name) { + return Err(DecodeLogError::InvalidName(name)); + } + match resolve_parent(log, None).await { + Some(parent_path) => Ok(Mint { name, parent_path }), + None => Err(DecodeLogError::UnresolvedParent(name)), + } +} + /// Decode a note log from the hypermap into a 'resolved' format. /// /// Uses [`valid_name()`] to check if the name is valid. +#[cfg(not(feature = "hyperapp"))] pub fn decode_note_log(log: &crate::eth::Log) -> Result { let contract::Note::SIGNATURE_HASH = log.topics()[0] else { return Err(DecodeLogError::UnexpectedTopic(log.topics()[0])); @@ -411,6 +433,31 @@ pub fn decode_note_log(log: &crate::eth::Log) -> Result { } } +/// Decode a note log from the hypermap into a 'resolved' format. +/// +/// Uses [`valid_name()`] to check if the name is valid. +#[cfg(feature = "hyperapp")] +pub async fn decode_note_log(log: &crate::eth::Log) -> Result { + let contract::Note::SIGNATURE_HASH = log.topics()[0] else { + return Err(DecodeLogError::UnexpectedTopic(log.topics()[0])); + }; + let decoded = contract::Note::decode_log_data(log.data(), true) + .map_err(|e| DecodeLogError::DecodeError(e.to_string()))?; + let note = String::from_utf8_lossy(&decoded.label).to_string(); + if !valid_note(¬e) { + return Err(DecodeLogError::InvalidName(note)); + } + match resolve_parent(log, None).await { + Some(parent_path) => Ok(Note { + note, + parent_path, + data: decoded.data, + }), + None => Err(DecodeLogError::UnresolvedParent(note)), + } +} + +#[cfg(not(feature = "hyperapp"))] pub fn decode_fact_log(log: &crate::eth::Log) -> Result { let contract::Fact::SIGNATURE_HASH = log.topics()[0] else { return Err(DecodeLogError::UnexpectedTopic(log.topics()[0])); @@ -431,17 +478,48 @@ pub fn decode_fact_log(log: &crate::eth::Log) -> Result { } } +#[cfg(feature = "hyperapp")] +pub async fn decode_fact_log(log: &crate::eth::Log) -> Result { + let contract::Fact::SIGNATURE_HASH = log.topics()[0] else { + return Err(DecodeLogError::UnexpectedTopic(log.topics()[0])); + }; + let decoded = contract::Fact::decode_log_data(log.data(), true) + .map_err(|e| DecodeLogError::DecodeError(e.to_string()))?; + let fact = String::from_utf8_lossy(&decoded.label).to_string(); + if !valid_fact(&fact) { + return Err(DecodeLogError::InvalidName(fact)); + } + match resolve_parent(log, None).await { + Some(parent_path) => Ok(Fact { + fact, + parent_path, + data: decoded.data, + }), + None => Err(DecodeLogError::UnresolvedParent(fact)), + } +} + /// Given a [`crate::eth::Log`] (which must be a log from hypermap), resolve the parent name /// of the new entry or note. +#[cfg(not(feature = "hyperapp"))] pub fn resolve_parent(log: &crate::eth::Log, timeout: Option) -> Option { let parent_hash = log.topics()[1].to_string(); net::get_name(&parent_hash, log.block_number, timeout) } +/// Given a [`crate::eth::Log`] (which must be a log from hypermap), resolve the parent name +/// of the new entry or note. +#[cfg(feature = "hyperapp")] +pub async fn resolve_parent(log: &crate::eth::Log, timeout: Option) -> Option { + let parent_hash = log.topics()[1].to_string(); + net::get_name(&parent_hash, log.block_number, timeout).await +} + /// Given a [`crate::eth::Log`] (which must be a log from hypermap), resolve the full name /// of the new entry or note. /// /// Uses [`valid_name()`] to check if the name is valid. +#[cfg(not(feature = "hyperapp"))] pub fn resolve_full_name(log: &crate::eth::Log, timeout: Option) -> Option { let parent_hash = log.topics()[1].to_string(); let parent_name = net::get_name(&parent_hash, log.block_number, timeout)?; @@ -471,6 +549,40 @@ pub fn resolve_full_name(log: &crate::eth::Log, timeout: Option) -> Option< Some(format!("{name}.{parent_name}")) } +/// Given a [`crate::eth::Log`] (which must be a log from hypermap), resolve the full name +/// of the new entry or note. +/// +/// Uses [`valid_name()`] to check if the name is valid. +#[cfg(feature = "hyperapp")] +pub async fn resolve_full_name(log: &crate::eth::Log, timeout: Option) -> Option { + let parent_hash = log.topics()[1].to_string(); + let parent_name = net::get_name(&parent_hash, log.block_number, timeout).await?; + let log_name = match log.topics()[0] { + contract::Mint::SIGNATURE_HASH => { + let decoded = contract::Mint::decode_log_data(log.data(), true).unwrap(); + decoded.label + } + contract::Note::SIGNATURE_HASH => { + let decoded = contract::Note::decode_log_data(log.data(), true).unwrap(); + decoded.label + } + contract::Fact::SIGNATURE_HASH => { + let decoded = contract::Fact::decode_log_data(log.data(), true).unwrap(); + decoded.label + } + _ => return None, + }; + let name = String::from_utf8_lossy(&log_name); + if !valid_entry( + &name, + log.topics()[0] == contract::Note::SIGNATURE_HASH, + log.topics()[0] == contract::Fact::SIGNATURE_HASH, + ) { + return None; + } + Some(format!("{name}.{parent_name}")) +} + pub fn eth_apply_filter(logs: &[EthLog], filter: &EthFilter) -> Vec { let mut matched_logs = Vec::new(); @@ -947,6 +1059,7 @@ impl Hypermap { )) } + #[cfg(not(feature = "hyperapp"))] pub fn validate_log_cache(&self, log_cache: &LogCache) -> anyhow::Result { let from_block = log_cache.metadata.from_block.parse::().map_err(|_| { anyhow::anyhow!( @@ -978,6 +1091,40 @@ impl Hypermap { )?) } + #[cfg(feature = "hyperapp")] + pub async fn validate_log_cache(&self, log_cache: &LogCache) -> anyhow::Result { + let from_block = log_cache.metadata.from_block.parse::().map_err(|_| { + anyhow::anyhow!( + "Invalid from_block in metadata: {}", + log_cache.metadata.from_block + ) + })?; + let to_block = log_cache.metadata.to_block.parse::().map_err(|_| { + anyhow::anyhow!( + "Invalid to_block in metadata: {}", + log_cache.metadata.to_block + ) + })?; + + let mut bytes_to_verify = serde_json::to_vec(&log_cache.logs) + .map_err(|e| anyhow::anyhow!("Failed to serialize logs for validation: {:?}", e))?; + bytes_to_verify.extend_from_slice(&from_block.to_be_bytes()); + bytes_to_verify.extend_from_slice(&to_block.to_be_bytes()); + let hashed_data = keccak256(&bytes_to_verify); + + let signature_hex = log_cache.metadata.signature.trim_start_matches("0x"); + let signature_bytes = hex::decode(signature_hex) + .map_err(|e| anyhow::anyhow!("Failed to decode hex signature: {:?}", e))?; + + Ok(sign::net_key_verify( + hashed_data.to_vec(), + &log_cache.metadata.created_by.parse::()?, + signature_bytes, + ) + .await?) + } + + #[cfg(not(feature = "hyperapp"))] pub fn get_bootstrap( &self, from_block: Option, @@ -1057,6 +1204,89 @@ impl Hypermap { Ok((block, unique_logs)) } + #[cfg(feature = "hyperapp")] + pub async fn get_bootstrap( + &self, + from_block: Option, + retry_params: Option<(u64, Option)>, + chain: Option, + ) -> anyhow::Result<(u64, Vec)> { + print_to_terminal( + 2, + &format!( + "get_bootstrap: from_block={:?}, retry_params={:?}, chain={:?}", + from_block, retry_params, chain, + ), + ); + let (block, log_caches) = self.get_bootstrap_log_cache(from_block, retry_params, chain)?; + + let mut all_valid_logs: Vec = Vec::new(); + let request_from_block_val = from_block.unwrap_or(0); + + for log_cache in log_caches { + match self.validate_log_cache(&log_cache).await { + Ok(true) => { + for log in log_cache.logs { + if let Some(log_block_number) = log.block_number { + if log_block_number >= request_from_block_val { + all_valid_logs.push(log); + } + } else { + if from_block.is_none() { + all_valid_logs.push(log); + } + } + } + } + Ok(false) => { + print_to_terminal( + 1, + &format!("LogCache validation failed for cache created by {}. Discarding {} logs.", + log_cache.metadata.created_by, + log_cache.logs.len()) + ); + } + Err(e) => { + print_to_terminal( + 1, + &format!( + "Error validating LogCache from {}: {:?}. Discarding {} logs.", + log_cache.metadata.created_by, + e, + log_cache.logs.len() + ), + ); + } + } + } + + all_valid_logs.sort_by(|a, b| { + let block_cmp = a.block_number.cmp(&b.block_number); + if block_cmp == std::cmp::Ordering::Equal { + std::cmp::Ordering::Equal + } else { + block_cmp + } + }); + + let mut unique_logs = Vec::new(); + for log in all_valid_logs { + if !unique_logs.contains(&log) { + unique_logs.push(log); + } + } + + print_to_terminal( + 2, + &format!( + "get_bootstrap: Consolidated {} unique logs.", + unique_logs.len(), + ), + ); + Ok((block, unique_logs)) + } + + #[cfg(not(feature = "hyperapp"))] pub fn bootstrap( &self, from_block: Option, @@ -1097,6 +1327,45 @@ impl Hypermap { ); Ok((block, results_per_filter)) } + + #[cfg(feature = "hyperapp")] + pub async fn bootstrap( + &self, + from_block: Option, + filters: Vec, + retry_params: Option<(u64, Option)>, + chain: Option, + ) -> anyhow::Result<(u64, Vec>)> { + print_to_terminal( + 2, + &format!( + "bootstrap: from_block={:?}, filters={:?}, retry_params={:?}, chain={:?}", + from_block, filters, retry_params, chain, + ), + ); + let (block, consolidated_logs) = + self.get_bootstrap(from_block, retry_params, chain).await?; + + if consolidated_logs.is_empty() { + print_to_terminal(2,"bootstrap: No logs retrieved after consolidation. Returning empty results for filters."); + return Ok((block, filters.iter().map(|_| Vec::new()).collect())); + } + + let mut results_per_filter: Vec> = Vec::new(); + for filter in filters { + let filtered_logs = eth_apply_filter(&consolidated_logs, &filter); + results_per_filter.push(filtered_logs); + } + + print_to_terminal( + 2, + &format!( + "bootstrap: Applied {} filters to bootstrapped logs.", + results_per_filter.len(), + ), + ); + Ok((block, results_per_filter)) + } } impl Serialize for ManifestItem { diff --git a/src/kv.rs b/src/kv.rs index 71611b4..f4178b6 100644 --- a/src/kv.rs +++ b/src/kv.rs @@ -1,8 +1,18 @@ -use crate::{get_blob, Message, PackageId, Request}; -use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use crate::PackageId; +use serde::{Deserialize, Serialize}; use std::marker::PhantomData; use thiserror::Error; +#[cfg(not(feature = "hyperapp"))] +mod kv_sync; +#[cfg(not(feature = "hyperapp"))] +pub use kv_sync::{open, open_raw, remove_db}; + +#[cfg(feature = "hyperapp")] +mod kv_async; +#[cfg(feature = "hyperapp")] +pub use kv_async::{open, open_raw, remove_db}; + /// Actions are sent to a specific key value database. `db` is the name, /// `package_id` is the [`PackageId`] that created the database. Capabilities /// are checked: you can access another process's database if it has given @@ -157,409 +167,3 @@ pub struct Kv { pub timeout: u64, _marker: PhantomData<(K, V)>, } - -impl Kv -where - K: Serialize + DeserializeOwned, - V: Serialize + DeserializeOwned, -{ - /// Get a value. - pub fn get(&self, key: &K) -> anyhow::Result { - let key = serde_json::to_vec(key)?; - let res = Request::new() - .target(("our", "kv", "distro", "sys")) - .body(serde_json::to_vec(&KvRequest { - package_id: self.package_id.clone(), - db: self.db.clone(), - action: KvAction::Get(key), - })?) - .send_and_await_response(self.timeout)?; - - match res { - Ok(Message::Response { body, .. }) => { - let response = serde_json::from_slice::(&body)?; - - match response { - KvResponse::Get { .. } => { - let bytes = match get_blob() { - Some(bytes) => bytes.bytes, - None => return Err(anyhow::anyhow!("kv: no blob")), - }; - let value = serde_json::from_slice::(&bytes) - .map_err(|e| anyhow::anyhow!("Failed to deserialize value: {}", e))?; - Ok(value) - } - KvResponse::Err(error) => Err(error.into()), - _ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)), - } - } - _ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)), - } - } - - /// Get a value as a different type T - pub fn get_as(&self, key: &K) -> anyhow::Result - where - T: DeserializeOwned, - { - let key = serde_json::to_vec(key)?; - let res = Request::new() - .target(("our", "kv", "distro", "sys")) - .body(serde_json::to_vec(&KvRequest { - package_id: self.package_id.clone(), - db: self.db.clone(), - action: KvAction::Get(key), - })?) - .send_and_await_response(self.timeout)?; - - match res { - Ok(Message::Response { body, .. }) => { - let response = serde_json::from_slice::(&body)?; - - match response { - KvResponse::Get { .. } => { - let bytes = match get_blob() { - Some(bytes) => bytes.bytes, - None => return Err(anyhow::anyhow!("kv: no blob")), - }; - let value = serde_json::from_slice::(&bytes) - .map_err(|e| anyhow::anyhow!("Failed to deserialize value: {}", e))?; - Ok(value) - } - KvResponse::Err(error) => Err(error.into()), - _ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)), - } - } - _ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)), - } - } - - /// Set a value, optionally in a transaction. - pub fn set(&self, key: &K, value: &V, tx_id: Option) -> anyhow::Result<()> { - let key = serde_json::to_vec(key)?; - let value = serde_json::to_vec(value)?; - - let res = Request::new() - .target(("our", "kv", "distro", "sys")) - .body(serde_json::to_vec(&KvRequest { - package_id: self.package_id.clone(), - db: self.db.clone(), - action: KvAction::Set { key, tx_id }, - })?) - .blob_bytes(value) - .send_and_await_response(self.timeout)?; - - match res { - Ok(Message::Response { body, .. }) => { - let response = serde_json::from_slice::(&body)?; - - match response { - KvResponse::Ok => Ok(()), - KvResponse::Err(error) => Err(error.into()), - _ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)), - } - } - _ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)), - } - } - - /// Set a value as a different type T - pub fn set_as(&self, key: &K, value: &T, tx_id: Option) -> anyhow::Result<()> - where - T: Serialize, - { - let key = serde_json::to_vec(key)?; - let value = serde_json::to_vec(value)?; - - let res = Request::new() - .target(("our", "kv", "distro", "sys")) - .body(serde_json::to_vec(&KvRequest { - package_id: self.package_id.clone(), - db: self.db.clone(), - action: KvAction::Set { key, tx_id }, - })?) - .blob_bytes(value) - .send_and_await_response(self.timeout)?; - - match res { - Ok(Message::Response { body, .. }) => { - let response = serde_json::from_slice::(&body)?; - - match response { - KvResponse::Ok => Ok(()), - KvResponse::Err(error) => Err(error.into()), - _ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)), - } - } - _ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)), - } - } - - /// Delete a value, optionally in a transaction. - pub fn delete(&self, key: &K, tx_id: Option) -> anyhow::Result<()> { - let key = serde_json::to_vec(key)?; - let res = Request::new() - .target(("our", "kv", "distro", "sys")) - .body(serde_json::to_vec(&KvRequest { - package_id: self.package_id.clone(), - db: self.db.clone(), - action: KvAction::Delete { key, tx_id }, - })?) - .send_and_await_response(self.timeout)?; - - match res { - Ok(Message::Response { body, .. }) => { - let response = serde_json::from_slice::(&body)?; - - match response { - KvResponse::Ok => Ok(()), - KvResponse::Err(error) => Err(error.into()), - _ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)), - } - } - _ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)), - } - } - - /// Delete a value with a different key type - pub fn delete_as(&self, key: &T, tx_id: Option) -> anyhow::Result<()> - where - T: Serialize, - { - let key = serde_json::to_vec(key)?; - - let res = Request::new() - .target(("our", "kv", "distro", "sys")) - .body(serde_json::to_vec(&KvRequest { - package_id: self.package_id.clone(), - db: self.db.clone(), - action: KvAction::Delete { key, tx_id }, - })?) - .send_and_await_response(self.timeout)?; - - match res { - Ok(Message::Response { body, .. }) => { - let response = serde_json::from_slice::(&body)?; - - match response { - KvResponse::Ok => Ok(()), - KvResponse::Err(error) => Err(error.into()), - _ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)), - } - } - _ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)), - } - } - - /// Begin a transaction. - pub fn begin_tx(&self) -> anyhow::Result { - let res = Request::new() - .target(("our", "kv", "distro", "sys")) - .body(serde_json::to_vec(&KvRequest { - package_id: self.package_id.clone(), - db: self.db.clone(), - action: KvAction::BeginTx, - })?) - .send_and_await_response(self.timeout)?; - - match res { - Ok(Message::Response { body, .. }) => { - let response = serde_json::from_slice::(&body)?; - - match response { - KvResponse::BeginTx { tx_id } => Ok(tx_id), - KvResponse::Err(error) => Err(error.into()), - _ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)), - } - } - _ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)), - } - } - - /// Commit a transaction. - pub fn commit_tx(&self, tx_id: u64) -> anyhow::Result<()> { - let res = Request::new() - .target(("our", "kv", "distro", "sys")) - .body(serde_json::to_vec(&KvRequest { - package_id: self.package_id.clone(), - db: self.db.clone(), - action: KvAction::Commit { tx_id }, - })?) - .send_and_await_response(self.timeout)?; - - match res { - Ok(Message::Response { body, .. }) => { - let response = serde_json::from_slice::(&body)?; - - match response { - KvResponse::Ok => Ok(()), - KvResponse::Err(error) => Err(error.into()), - _ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)), - } - } - _ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)), - } - } -} - -impl Kv, Vec> { - /// Get raw bytes directly - pub fn get_raw(&self, key: &[u8]) -> anyhow::Result> { - let res = Request::new() - .target(("our", "kv", "distro", "sys")) - .body(serde_json::to_vec(&KvRequest { - package_id: self.package_id.clone(), - db: self.db.clone(), - action: KvAction::Get(key.to_vec()), - })?) - .send_and_await_response(self.timeout)?; - - match res { - Ok(Message::Response { body, .. }) => { - let response = serde_json::from_slice::(&body)?; - - match response { - KvResponse::Get { .. } => { - let bytes = match get_blob() { - Some(bytes) => bytes.bytes, - None => return Err(anyhow::anyhow!("kv: no blob")), - }; - Ok(bytes) - } - KvResponse::Err { 0: error } => Err(error.into()), - _ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)), - } - } - _ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)), - } - } - - /// Set raw bytes directly - pub fn set_raw(&self, key: &[u8], value: &[u8], tx_id: Option) -> anyhow::Result<()> { - let res = Request::new() - .target(("our", "kv", "distro", "sys")) - .body(serde_json::to_vec(&KvRequest { - package_id: self.package_id.clone(), - db: self.db.clone(), - action: KvAction::Set { - key: key.to_vec(), - tx_id, - }, - })?) - .blob_bytes(value.to_vec()) - .send_and_await_response(self.timeout)?; - - match res { - Ok(Message::Response { body, .. }) => { - let response = serde_json::from_slice::(&body)?; - - match response { - KvResponse::Ok => Ok(()), - KvResponse::Err { 0: error } => Err(error.into()), - _ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)), - } - } - _ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)), - } - } - - /// Delete raw bytes directly - pub fn delete_raw(&self, key: &[u8], tx_id: Option) -> anyhow::Result<()> { - let res = Request::new() - .target(("our", "kv", "distro", "sys")) - .body(serde_json::to_vec(&KvRequest { - package_id: self.package_id.clone(), - db: self.db.clone(), - action: KvAction::Delete { - key: key.to_vec(), - tx_id, - }, - })?) - .send_and_await_response(self.timeout)?; - - match res { - Ok(Message::Response { body, .. }) => { - let response = serde_json::from_slice::(&body)?; - - match response { - KvResponse::Ok => Ok(()), - KvResponse::Err { 0: error } => Err(error.into()), - _ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)), - } - } - _ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)), - } - } -} - -/// Helper function to open a raw bytes key-value store -pub fn open_raw( - package_id: PackageId, - db: &str, - timeout: Option, -) -> anyhow::Result, Vec>> { - open(package_id, db, timeout) -} - -/// Opens or creates a kv db. -pub fn open(package_id: PackageId, db: &str, timeout: Option) -> anyhow::Result> -where - K: Serialize + DeserializeOwned, - V: Serialize + DeserializeOwned, -{ - let timeout = timeout.unwrap_or(5); - - let res = Request::new() - .target(("our", "kv", "distro", "sys")) - .body(serde_json::to_vec(&KvRequest { - package_id: package_id.clone(), - db: db.to_string(), - action: KvAction::Open, - })?) - .send_and_await_response(timeout)?; - - match res { - Ok(Message::Response { body, .. }) => { - let response = serde_json::from_slice::(&body)?; - - match response { - KvResponse::Ok => Ok(Kv { - package_id, - db: db.to_string(), - timeout, - _marker: PhantomData, - }), - KvResponse::Err(error) => Err(error.into()), - _ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)), - } - } - _ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)), - } -} - -/// Removes and deletes a kv db. -pub fn remove_db(package_id: PackageId, db: &str, timeout: Option) -> anyhow::Result<()> { - let timeout = timeout.unwrap_or(5); - - let res = Request::new() - .target(("our", "kv", "distro", "sys")) - .body(serde_json::to_vec(&KvRequest { - package_id: package_id.clone(), - db: db.to_string(), - action: KvAction::RemoveDb, - })?) - .send_and_await_response(timeout)?; - - match res { - Ok(Message::Response { body, .. }) => { - let response = serde_json::from_slice::(&body)?; - - match response { - KvResponse::Ok => Ok(()), - KvResponse::Err(error) => Err(error.into()), - _ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)), - } - } - _ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)), - } -} diff --git a/src/kv/kv_async.rs b/src/kv/kv_async.rs new file mode 100644 index 0000000..f6d64c3 --- /dev/null +++ b/src/kv/kv_async.rs @@ -0,0 +1,258 @@ +use crate::{ + get_blob, hyperapp, + kv::{Kv, KvAction, KvRequest, KvResponse}, + PackageId, Request, +}; +use serde::{de::DeserializeOwned, Serialize}; +use std::marker::PhantomData; + +impl Kv +where + K: Serialize + DeserializeOwned, + V: Serialize + DeserializeOwned, +{ + /// Get a value. + pub async fn get(&self, key: &K) -> anyhow::Result { + let key = serde_json::to_vec(key)?; + let request = Request::new() + .target(("our", "kv", "distro", "sys")) + .body(serde_json::to_vec(&KvRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: KvAction::Get(key), + })?) + .expects_response(self.timeout); + + let response = hyperapp::send::(request).await?; + + match response { + KvResponse::Get { .. } => { + let bytes = match get_blob() { + Some(bytes) => bytes.bytes, + None => return Err(anyhow::anyhow!("kv: no blob")), + }; + let value = serde_json::from_slice::(&bytes) + .map_err(|e| anyhow::anyhow!("Failed to deserialize value: {}", e))?; + Ok(value) + } + KvResponse::Err(error) => Err(error.into()), + _ => Err(anyhow::anyhow!("kv: unexpected response")), + } + } + + /// Get a value as a different type T + pub async fn get_as(&self, key: &K) -> anyhow::Result + where + T: DeserializeOwned, + { + let key = serde_json::to_vec(key)?; + let request = Request::new() + .target(("our", "kv", "distro", "sys")) + .body(serde_json::to_vec(&KvRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: KvAction::Get(key), + })?) + .expects_response(self.timeout); + + let response = hyperapp::send::(request).await?; + + match response { + KvResponse::Get { .. } => { + let bytes = match get_blob() { + Some(bytes) => bytes.bytes, + None => return Err(anyhow::anyhow!("kv: no blob")), + }; + let value = serde_json::from_slice::(&bytes) + .map_err(|e| anyhow::anyhow!("Failed to deserialize value: {}", e))?; + Ok(value) + } + KvResponse::Err(error) => Err(error.into()), + _ => Err(anyhow::anyhow!("kv: unexpected response")), + } + } + + /// Set a value, optionally in a transaction. + pub async fn set(&self, key: &K, value: &V, tx_id: Option) -> anyhow::Result<()> { + let key = serde_json::to_vec(key)?; + let value = serde_json::to_vec(value)?; + let request = Request::new() + .target(("our", "kv", "distro", "sys")) + .body(serde_json::to_vec(&KvRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: KvAction::Set { key, tx_id }, + })?) + .blob_bytes(value) + .expects_response(self.timeout); + + let response = hyperapp::send::(request).await?; + + match response { + KvResponse::Ok => Ok(()), + KvResponse::Err(error) => Err(error.into()), + _ => Err(anyhow::anyhow!("kv: unexpected response")), + } + } + + /// Set a value as a different type T + pub async fn set_as(&self, key: &K, value: &T, tx_id: Option) -> anyhow::Result<()> + where + T: Serialize, + { + let key = serde_json::to_vec(key)?; + let value = serde_json::to_vec(value)?; + let request = Request::new() + .target(("our", "kv", "distro", "sys")) + .body(serde_json::to_vec(&KvRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: KvAction::Set { key, tx_id }, + })?) + .blob_bytes(value) + .expects_response(self.timeout); + + let response = hyperapp::send::(request).await?; + + match response { + KvResponse::Ok => Ok(()), + KvResponse::Err(error) => Err(error.into()), + _ => Err(anyhow::anyhow!("kv: unexpected response")), + } + } + + /// Delete a value, optionally in a transaction. + pub async fn delete(&self, key: &K, tx_id: Option) -> anyhow::Result<()> { + let key = serde_json::to_vec(key)?; + let request = Request::new() + .target(("our", "kv", "distro", "sys")) + .body(serde_json::to_vec(&KvRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: KvAction::Delete { key, tx_id }, + })?) + .expects_response(self.timeout); + + let response = hyperapp::send::(request).await?; + + match response { + KvResponse::Ok => Ok(()), + KvResponse::Err(error) => Err(error.into()), + _ => Err(anyhow::anyhow!("kv: unexpected response")), + } + } + + /// Delete a value with a different key type + pub async fn delete_as(&self, key: &T, tx_id: Option) -> anyhow::Result<()> + where + T: Serialize, + { + let key = serde_json::to_vec(key)?; + let request = Request::new() + .target(("our", "kv", "distro", "sys")) + .body(serde_json::to_vec(&KvRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: KvAction::Delete { key, tx_id }, + })?) + .expects_response(self.timeout); + + let response = hyperapp::send::(request).await?; + + match response { + KvResponse::Ok => Ok(()), + KvResponse::Err(error) => Err(error.into()), + _ => Err(anyhow::anyhow!("kv: unexpected response")), + } + } + + /// Begin a transaction. + pub async fn begin_tx(&self) -> anyhow::Result { + let request = Request::new() + .target(("our", "kv", "distro", "sys")) + .body(serde_json::to_vec(&KvRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: KvAction::BeginTx, + })?) + .expects_response(self.timeout); + + let response = hyperapp::send::(request).await?; + + match response { + KvResponse::BeginTx { tx_id } => Ok(tx_id), + KvResponse::Err(error) => Err(error.into()), + _ => Err(anyhow::anyhow!("kv: unexpected response")), + } + } +} + +/// Removes and deletes a kv db. +pub async fn remove_db( + package_id: PackageId, + db: &str, + timeout: Option, +) -> anyhow::Result<()> { + let timeout = timeout.unwrap_or(5); + + let request = Request::new() + .target(("our", "kv", "distro", "sys")) + .body(serde_json::to_vec(&KvRequest { + package_id: package_id.clone(), + db: db.to_string(), + action: KvAction::RemoveDb, + })?) + .expects_response(timeout); + + let response = hyperapp::send::(request).await?; + + match response { + KvResponse::Ok => Ok(()), + KvResponse::Err(error) => Err(error.into()), + _ => Err(anyhow::anyhow!("kv: unexpected response")), + } +} + +/// Helper function to open a raw bytes key-value store +pub async fn open_raw( + package_id: PackageId, + db: &str, + timeout: Option, +) -> anyhow::Result, Vec>> { + open(package_id, db, timeout).await +} + +/// Opens or creates a kv db. +pub async fn open( + package_id: PackageId, + db: &str, + timeout: Option, +) -> anyhow::Result> +where + K: Serialize + DeserializeOwned, + V: Serialize + DeserializeOwned, +{ + let timeout = timeout.unwrap_or(5); + + let request = Request::new() + .target(("our", "kv", "distro", "sys")) + .body(serde_json::to_vec(&KvRequest { + package_id: package_id.clone(), + db: db.to_string(), + action: KvAction::Open, + })?) + .expects_response(timeout); + + let response = hyperapp::send::(request).await?; + + match response { + KvResponse::Ok => Ok(Kv { + package_id, + db: db.to_string(), + timeout, + _marker: PhantomData, + }), + KvResponse::Err(error) => Err(error.into()), + _ => Err(anyhow::anyhow!("kv: unexpected response")), + } +} diff --git a/src/kv/kv_sync.rs b/src/kv/kv_sync.rs new file mode 100644 index 0000000..a4fb404 --- /dev/null +++ b/src/kv/kv_sync.rs @@ -0,0 +1,413 @@ +use crate::{ + get_blob, + kv::{Kv, KvAction, KvRequest, KvResponse}, + Message, PackageId, Request, +}; +use serde::{de::DeserializeOwned, Serialize}; +use std::marker::PhantomData; + +impl Kv +where + K: Serialize + DeserializeOwned, + V: Serialize + DeserializeOwned, +{ + /// Get a value. + pub fn get(&self, key: &K) -> anyhow::Result { + let key = serde_json::to_vec(key)?; + let res = Request::new() + .target(("our", "kv", "distro", "sys")) + .body(serde_json::to_vec(&KvRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: KvAction::Get(key), + })?) + .send_and_await_response(self.timeout)?; + + match res { + Ok(Message::Response { body, .. }) => { + let response = serde_json::from_slice::(&body)?; + + match response { + KvResponse::Get { .. } => { + let bytes = match get_blob() { + Some(bytes) => bytes.bytes, + None => return Err(anyhow::anyhow!("kv: no blob")), + }; + let value = serde_json::from_slice::(&bytes) + .map_err(|e| anyhow::anyhow!("Failed to deserialize value: {}", e))?; + Ok(value) + } + KvResponse::Err(error) => Err(error.into()), + _ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)), + } + } + _ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)), + } + } + + /// Get a value as a different type T + pub fn get_as(&self, key: &K) -> anyhow::Result + where + T: DeserializeOwned, + { + let key = serde_json::to_vec(key)?; + let res = Request::new() + .target(("our", "kv", "distro", "sys")) + .body(serde_json::to_vec(&KvRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: KvAction::Get(key), + })?) + .send_and_await_response(self.timeout)?; + + match res { + Ok(Message::Response { body, .. }) => { + let response = serde_json::from_slice::(&body)?; + + match response { + KvResponse::Get { .. } => { + let bytes = match get_blob() { + Some(bytes) => bytes.bytes, + None => return Err(anyhow::anyhow!("kv: no blob")), + }; + let value = serde_json::from_slice::(&bytes) + .map_err(|e| anyhow::anyhow!("Failed to deserialize value: {}", e))?; + Ok(value) + } + KvResponse::Err(error) => Err(error.into()), + _ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)), + } + } + _ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)), + } + } + + /// Set a value, optionally in a transaction. + pub fn set(&self, key: &K, value: &V, tx_id: Option) -> anyhow::Result<()> { + let key = serde_json::to_vec(key)?; + let value = serde_json::to_vec(value)?; + + let res = Request::new() + .target(("our", "kv", "distro", "sys")) + .body(serde_json::to_vec(&KvRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: KvAction::Set { key, tx_id }, + })?) + .blob_bytes(value) + .send_and_await_response(self.timeout)?; + + match res { + Ok(Message::Response { body, .. }) => { + let response = serde_json::from_slice::(&body)?; + + match response { + KvResponse::Ok => Ok(()), + KvResponse::Err(error) => Err(error.into()), + _ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)), + } + } + _ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)), + } + } + + /// Set a value as a different type T + pub fn set_as(&self, key: &K, value: &T, tx_id: Option) -> anyhow::Result<()> + where + T: Serialize, + { + let key = serde_json::to_vec(key)?; + let value = serde_json::to_vec(value)?; + + let res = Request::new() + .target(("our", "kv", "distro", "sys")) + .body(serde_json::to_vec(&KvRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: KvAction::Set { key, tx_id }, + })?) + .blob_bytes(value) + .send_and_await_response(self.timeout)?; + + match res { + Ok(Message::Response { body, .. }) => { + let response = serde_json::from_slice::(&body)?; + + match response { + KvResponse::Ok => Ok(()), + KvResponse::Err(error) => Err(error.into()), + _ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)), + } + } + _ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)), + } + } + + /// Delete a value, optionally in a transaction. + pub fn delete(&self, key: &K, tx_id: Option) -> anyhow::Result<()> { + let key = serde_json::to_vec(key)?; + let res = Request::new() + .target(("our", "kv", "distro", "sys")) + .body(serde_json::to_vec(&KvRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: KvAction::Delete { key, tx_id }, + })?) + .send_and_await_response(self.timeout)?; + + match res { + Ok(Message::Response { body, .. }) => { + let response = serde_json::from_slice::(&body)?; + + match response { + KvResponse::Ok => Ok(()), + KvResponse::Err(error) => Err(error.into()), + _ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)), + } + } + _ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)), + } + } + + /// Delete a value with a different key type + pub fn delete_as(&self, key: &T, tx_id: Option) -> anyhow::Result<()> + where + T: Serialize, + { + let key = serde_json::to_vec(key)?; + + let res = Request::new() + .target(("our", "kv", "distro", "sys")) + .body(serde_json::to_vec(&KvRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: KvAction::Delete { key, tx_id }, + })?) + .send_and_await_response(self.timeout)?; + + match res { + Ok(Message::Response { body, .. }) => { + let response = serde_json::from_slice::(&body)?; + + match response { + KvResponse::Ok => Ok(()), + KvResponse::Err(error) => Err(error.into()), + _ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)), + } + } + _ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)), + } + } + + /// Begin a transaction. + pub fn begin_tx(&self) -> anyhow::Result { + let res = Request::new() + .target(("our", "kv", "distro", "sys")) + .body(serde_json::to_vec(&KvRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: KvAction::BeginTx, + })?) + .send_and_await_response(self.timeout)?; + + match res { + Ok(Message::Response { body, .. }) => { + let response = serde_json::from_slice::(&body)?; + + match response { + KvResponse::BeginTx { tx_id } => Ok(tx_id), + KvResponse::Err(error) => Err(error.into()), + _ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)), + } + } + _ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)), + } + } + + /// Commit a transaction. + pub fn commit_tx(&self, tx_id: u64) -> anyhow::Result<()> { + let res = Request::new() + .target(("our", "kv", "distro", "sys")) + .body(serde_json::to_vec(&KvRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: KvAction::Commit { tx_id }, + })?) + .send_and_await_response(self.timeout)?; + + match res { + Ok(Message::Response { body, .. }) => { + let response = serde_json::from_slice::(&body)?; + + match response { + KvResponse::Ok => Ok(()), + KvResponse::Err(error) => Err(error.into()), + _ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)), + } + } + _ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)), + } + } +} + +impl Kv, Vec> { + /// Get raw bytes directly + pub fn get_raw(&self, key: &[u8]) -> anyhow::Result> { + let res = Request::new() + .target(("our", "kv", "distro", "sys")) + .body(serde_json::to_vec(&KvRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: KvAction::Get(key.to_vec()), + })?) + .send_and_await_response(self.timeout)?; + + match res { + Ok(Message::Response { body, .. }) => { + let response = serde_json::from_slice::(&body)?; + + match response { + KvResponse::Get { .. } => { + let bytes = match get_blob() { + Some(bytes) => bytes.bytes, + None => return Err(anyhow::anyhow!("kv: no blob")), + }; + Ok(bytes) + } + KvResponse::Err(error) => Err(error.into()), + _ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)), + } + } + _ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)), + } + } + + /// Set raw bytes directly + pub fn set_raw(&self, key: &[u8], value: &[u8], tx_id: Option) -> anyhow::Result<()> { + let res = Request::new() + .target(("our", "kv", "distro", "sys")) + .body(serde_json::to_vec(&KvRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: KvAction::Set { + key: key.to_vec(), + tx_id, + }, + })?) + .blob_bytes(value.to_vec()) + .send_and_await_response(self.timeout)?; + + match res { + Ok(Message::Response { body, .. }) => { + let response = serde_json::from_slice::(&body)?; + + match response { + KvResponse::Ok => Ok(()), + KvResponse::Err(error) => Err(error.into()), + _ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)), + } + } + _ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)), + } + } + + /// Delete raw bytes directly + pub fn delete_raw(&self, key: &[u8], tx_id: Option) -> anyhow::Result<()> { + let res = Request::new() + .target(("our", "kv", "distro", "sys")) + .body(serde_json::to_vec(&KvRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: KvAction::Delete { + key: key.to_vec(), + tx_id, + }, + })?) + .send_and_await_response(self.timeout)?; + + match res { + Ok(Message::Response { body, .. }) => { + let response = serde_json::from_slice::(&body)?; + + match response { + KvResponse::Ok => Ok(()), + KvResponse::Err(error) => Err(error.into()), + _ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)), + } + } + _ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)), + } + } +} + +/// Helper function to open a raw bytes key-value store +pub fn open_raw( + package_id: PackageId, + db: &str, + timeout: Option, +) -> anyhow::Result, Vec>> { + open(package_id, db, timeout) +} + +/// Opens or creates a kv db. +pub fn open(package_id: PackageId, db: &str, timeout: Option) -> anyhow::Result> +where + K: Serialize + DeserializeOwned, + V: Serialize + DeserializeOwned, +{ + let timeout = timeout.unwrap_or(5); + + let res = Request::new() + .target(("our", "kv", "distro", "sys")) + .body(serde_json::to_vec(&KvRequest { + package_id: package_id.clone(), + db: db.to_string(), + action: KvAction::Open, + })?) + .send_and_await_response(timeout)?; + + match res { + Ok(Message::Response { body, .. }) => { + let response = serde_json::from_slice::(&body)?; + + match response { + KvResponse::Ok => Ok(Kv { + package_id, + db: db.to_string(), + timeout, + _marker: PhantomData, + }), + KvResponse::Err(error) => Err(error.into()), + _ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)), + } + } + _ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)), + } +} + +/// Removes and deletes a kv db. +pub fn remove_db(package_id: PackageId, db: &str, timeout: Option) -> anyhow::Result<()> { + let timeout = timeout.unwrap_or(5); + + let res = Request::new() + .target(("our", "kv", "distro", "sys")) + .body(serde_json::to_vec(&KvRequest { + package_id: package_id.clone(), + db: db.to_string(), + action: KvAction::RemoveDb, + })?) + .send_and_await_response(timeout)?; + + match res { + Ok(Message::Response { body, .. }) => { + let response = serde_json::from_slice::(&body)?; + + match response { + KvResponse::Ok => Ok(()), + KvResponse::Err(error) => Err(error.into()), + _ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)), + } + } + _ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)), + } +} diff --git a/src/lib.rs b/src/lib.rs index 6cd68d9..c72ace8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -87,6 +87,9 @@ pub mod wallet; /// A set of types and macros for writing "script" processes. pub mod scripting; +#[cfg(feature = "hyperapp")] +pub mod hyperapp; + #[cfg(feature = "hyperwallet")] pub mod hyperwallet_client; diff --git a/src/net.rs b/src/net.rs index 371e465..6a10b5d 100644 --- a/src/net.rs +++ b/src/net.rs @@ -1,7 +1,12 @@ -use crate::{get_blob, Address, NodeId, Request, SendError}; +#[cfg(not(feature = "hyperapp"))] +use crate::SendError; +use crate::{get_blob, Address, NodeId, Request}; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; +#[cfg(feature = "hyperapp")] +use crate::hyperapp; + // // Networking protocol types and functions for interacting with it // @@ -166,6 +171,7 @@ impl HnsUpdate { /// /// This function uses a 30-second timeout to reach `net:distro:sys`. If more /// control over the timeout is needed, create a [`Request`] directly. +#[cfg(not(feature = "hyperapp"))] pub fn sign(message: T) -> Result, SendError> where T: Into>, @@ -184,8 +190,40 @@ where /// to verify the signature, which takes a `from` address to match against /// the prepended signing [`Address`] of the source process. /// +/// Sign a message with the node's networking key. This may be used to prove +/// identity to other parties outside of using the networking protocol. +/// +/// Note that the given message will be prepended with the source [`Address`] +/// of this message. This is done in order to not allow different processes +/// on the same node to sign messages for/as one another. The receiver of +/// the signed message should use [`verify()`] to verify the signature, which +/// takes a `from` address to match against that prepended signing [`Address`]. +/// /// This function uses a 30-second timeout to reach `net:distro:sys`. If more /// control over the timeout is needed, create a [`Request`] directly. +#[cfg(feature = "hyperapp")] +pub async fn sign(message: T) -> Result, hyperapp::AppSendError> +where + T: Into>, +{ + let request = Request::to(("our", "net", "distro", "sys")) + .body(rmp_serde::to_vec(&NetAction::Sign).unwrap()) + .blob_bytes(message.into()) + .expects_response(30); + + let _response = hyperapp::send_rmp::(request).await?; + Ok(get_blob().unwrap().bytes) +} + +/// Verify a signature on a message. +/// +/// The receiver of a signature created using [`sign`] should use this function +/// to verify the signature, which takes a `from` address to match against +/// the prepended signing [`Address`] of the source process. +/// +/// This function uses a 30-second timeout to reach `net:distro:sys`. If more +/// control over the timeout is needed, create a [`Request`] directly. +#[cfg(not(feature = "hyperapp"))] pub fn verify(from: T, message: U, signature: V) -> Result where T: Into
, @@ -213,11 +251,51 @@ where }) } +/// Get a [`crate::hypermap::Hypermap`] entry name from its namehash. +/// +/// Verify a signature on a message. +/// +/// The receiver of a signature created using [`sign`] should use this function +/// to verify the signature, which takes a `from` address to match against +/// the prepended signing [`Address`] of the source process. +/// +/// This function uses a 30-second timeout to reach `net:distro:sys`. If more +/// control over the timeout is needed, create a [`Request`] directly. +#[cfg(feature = "hyperapp")] +pub async fn verify( + from: T, + message: U, + signature: V, +) -> Result +where + T: Into
, + U: Into>, + V: Into>, +{ + let request = Request::to(("our", "net", "distro", "sys")) + .body( + rmp_serde::to_vec(&NetAction::Verify { + from: from.into(), + signature: signature.into(), + }) + .unwrap(), + ) + .blob_bytes(message.into()) + .expects_response(30); + + let response = hyperapp::send_rmp::(request).await?; + let NetResponse::Verified(valid) = response else { + return Ok(false); + }; + Ok(valid) +} + /// Get a [`crate::hypermap::Hypermap`] entry name from its namehash. /// /// Default timeout is 30 seconds. Note that the responsiveness of the indexer /// will depend on the block option used. The indexer will wait until it has /// seen the block given to respond. +#[cfg(not(feature = "hyperapp"))] pub fn get_name(namehash: T, block: Option, timeout: Option) -> Option where T: Into, @@ -242,3 +320,30 @@ where maybe_name } + +/// Get a [`crate::hypermap::Hypermap`] entry name from its namehash. +/// +/// Default timeout is 30 seconds. Note that the responsiveness of the indexer +/// will depend on the block option used. The indexer will wait until it has +/// seen the block given to respond. +#[cfg(feature = "hyperapp")] +pub async fn get_name(namehash: T, block: Option, timeout: Option) -> Option +where + T: Into, +{ + let request = Request::to(("our", "hns-indexer", "hns-indexer", "sys")) + .body( + serde_json::to_vec(&IndexerRequests::NamehashToName(NamehashToNameRequest { + hash: namehash.into(), + block: block.unwrap_or(0), + })) + .unwrap(), + ) + .expects_response(timeout.unwrap_or(30)); + + let response = hyperapp::send::(request).await.ok()?; + + let IndexerResponses::Name(maybe_name) = response; + + maybe_name +} diff --git a/src/sign.rs b/src/sign.rs index 7611425..92a5186 100644 --- a/src/sign.rs +++ b/src/sign.rs @@ -11,6 +11,10 @@ use crate::hyperware::process::sign::{ }; use crate::{last_blob, Address, Request}; +#[cfg(feature = "hyperapp")] +use crate::hyperapp; + +#[cfg(not(feature = "hyperapp"))] pub fn net_key_sign(message: Vec) -> anyhow::Result> { let response = Request::to(("our", "sign", "sign", "sys")) .body(serde_json::to_vec(&SignRequest::NetKeySign).unwrap()) @@ -27,6 +31,26 @@ pub fn net_key_sign(message: Vec) -> anyhow::Result> { Ok(last_blob().unwrap().bytes) } +#[cfg(feature = "hyperapp")] +pub async fn net_key_sign(message: Vec) -> anyhow::Result> { + let request = Request::to(("our", "sign", "sign", "sys")) + .body(serde_json::to_vec(&SignRequest::NetKeySign).unwrap()) + .blob_bytes(message) + .expects_response(10); + + let response = hyperapp::send::(request).await?; + + let SignResponse::NetKeySign = response else { + return Err(anyhow::anyhow!( + "unexpected response from sign:sign:sys: {:?}", + response, + )); + }; + + Ok(last_blob().unwrap().bytes) +} + +#[cfg(not(feature = "hyperapp"))] pub fn net_key_verify( message: Vec, signer: &Address, @@ -53,6 +77,35 @@ pub fn net_key_verify( Ok(response) } +#[cfg(feature = "hyperapp")] +pub async fn net_key_verify( + message: Vec, + signer: &Address, + signature: Vec, +) -> anyhow::Result { + let request = Request::to(("our", "sign", "sign", "sys")) + .body( + serde_json::to_vec(&SignRequest::NetKeyVerify(NetKeyVerifyRequest { + node: signer.to_string(), + signature, + })) + .unwrap(), + ) + .blob_bytes(message) + .expects_response(10); + + let response = hyperapp::send::(request).await?; + + let SignResponse::NetKeyVerify(verified) = response else { + return Err(anyhow::anyhow!( + "unexpected response from sign:sign:sys: {:?}", + response, + )); + }; + + Ok(verified) +} + impl Serialize for NetKeyVerifyRequest { fn serialize(&self, serializer: S) -> Result where diff --git a/src/sqlite.rs b/src/sqlite.rs index 30c5bdd..f2d692d 100644 --- a/src/sqlite.rs +++ b/src/sqlite.rs @@ -1,8 +1,17 @@ -use crate::{get_blob, Message, PackageId, Request}; +use crate::PackageId; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; use thiserror::Error; +#[cfg(not(feature = "hyperapp"))] +mod sqlite_sync; +#[cfg(not(feature = "hyperapp"))] +pub use sqlite_sync::{open, remove_db}; + +#[cfg(feature = "hyperapp")] +mod sqlite_async; +#[cfg(feature = "hyperapp")] +pub use sqlite_async::{open, remove_db}; + /// Actions are sent to a specific SQLite database. `db` is the name, /// `package_id` is the [`PackageId`] that created the database. Capabilities /// are checked: you can access another process's database if it has given @@ -176,199 +185,3 @@ pub struct Sqlite { pub db: String, pub timeout: u64, } - -impl Sqlite { - /// Query database. Only allows sqlite read keywords. - pub fn read( - &self, - query: String, - params: Vec, - ) -> anyhow::Result>> { - let res = Request::new() - .target(("our", "sqlite", "distro", "sys")) - .body(serde_json::to_vec(&SqliteRequest { - package_id: self.package_id.clone(), - db: self.db.clone(), - action: SqliteAction::Query(query), - })?) - .blob_bytes(serde_json::to_vec(¶ms)?) - .send_and_await_response(self.timeout)?; - - match res { - Ok(Message::Response { body, .. }) => { - let response = serde_json::from_slice::(&body)?; - - match response { - SqliteResponse::Read => { - let blob = get_blob().ok_or_else(|| SqliteError::MalformedRequest)?; - let values = serde_json::from_slice::< - Vec>, - >(&blob.bytes) - .map_err(|_| SqliteError::MalformedRequest)?; - Ok(values) - } - SqliteResponse::Err(error) => Err(error.into()), - _ => Err(anyhow::anyhow!( - "sqlite: unexpected response {:?}", - response - )), - } - } - _ => Err(anyhow::anyhow!("sqlite: unexpected message: {:?}", res)), - } - } - - /// Execute a statement. Only allows sqlite write keywords. - pub fn write( - &self, - statement: String, - params: Vec, - tx_id: Option, - ) -> anyhow::Result<()> { - let res = Request::new() - .target(("our", "sqlite", "distro", "sys")) - .body(serde_json::to_vec(&SqliteRequest { - package_id: self.package_id.clone(), - db: self.db.clone(), - action: SqliteAction::Write { statement, tx_id }, - })?) - .blob_bytes(serde_json::to_vec(¶ms)?) - .send_and_await_response(self.timeout)?; - - match res { - Ok(Message::Response { body, .. }) => { - let response = serde_json::from_slice::(&body)?; - - match response { - SqliteResponse::Ok => Ok(()), - SqliteResponse::Err(error) => Err(error.into()), - _ => Err(anyhow::anyhow!( - "sqlite: unexpected response {:?}", - response - )), - } - } - _ => Err(anyhow::anyhow!("sqlite: unexpected message: {:?}", res)), - } - } - - /// Begin a transaction. - pub fn begin_tx(&self) -> anyhow::Result { - let res = Request::new() - .target(("our", "sqlite", "distro", "sys")) - .body(serde_json::to_vec(&SqliteRequest { - package_id: self.package_id.clone(), - db: self.db.clone(), - action: SqliteAction::BeginTx, - })?) - .send_and_await_response(self.timeout)?; - - match res { - Ok(Message::Response { body, .. }) => { - let response = serde_json::from_slice::(&body)?; - - match response { - SqliteResponse::BeginTx { tx_id } => Ok(tx_id), - SqliteResponse::Err(error) => Err(error.into()), - _ => Err(anyhow::anyhow!( - "sqlite: unexpected response {:?}", - response - )), - } - } - _ => Err(anyhow::anyhow!("sqlite: unexpected message: {:?}", res)), - } - } - - /// Commit a transaction. - pub fn commit_tx(&self, tx_id: u64) -> anyhow::Result<()> { - let res = Request::new() - .target(("our", "sqlite", "distro", "sys")) - .body(serde_json::to_vec(&SqliteRequest { - package_id: self.package_id.clone(), - db: self.db.clone(), - action: SqliteAction::Commit { tx_id }, - })?) - .send_and_await_response(self.timeout)?; - - match res { - Ok(Message::Response { body, .. }) => { - let response = serde_json::from_slice::(&body)?; - - match response { - SqliteResponse::Ok => Ok(()), - SqliteResponse::Err(error) => Err(error.into()), - _ => Err(anyhow::anyhow!( - "sqlite: unexpected response {:?}", - response - )), - } - } - _ => Err(anyhow::anyhow!("sqlite: unexpected message: {:?}", res)), - } - } -} - -/// Open or create sqlite database. -pub fn open(package_id: PackageId, db: &str, timeout: Option) -> anyhow::Result { - let timeout = timeout.unwrap_or(5); - - let res = Request::new() - .target(("our", "sqlite", "distro", "sys")) - .body(serde_json::to_vec(&SqliteRequest { - package_id: package_id.clone(), - db: db.to_string(), - action: SqliteAction::Open, - })?) - .send_and_await_response(timeout)?; - - match res { - Ok(Message::Response { body, .. }) => { - let response = serde_json::from_slice::(&body)?; - - match response { - SqliteResponse::Ok => Ok(Sqlite { - package_id, - db: db.to_string(), - timeout, - }), - SqliteResponse::Err(error) => Err(error.into()), - _ => Err(anyhow::anyhow!( - "sqlite: unexpected response {:?}", - response - )), - } - } - _ => Err(anyhow::anyhow!("sqlite: unexpected message: {:?}", res)), - } -} - -/// Remove and delete sqlite database. -pub fn remove_db(package_id: PackageId, db: &str, timeout: Option) -> anyhow::Result<()> { - let timeout = timeout.unwrap_or(5); - - let res = Request::new() - .target(("our", "sqlite", "distro", "sys")) - .body(serde_json::to_vec(&SqliteRequest { - package_id: package_id.clone(), - db: db.to_string(), - action: SqliteAction::RemoveDb, - })?) - .send_and_await_response(timeout)?; - - match res { - Ok(Message::Response { body, .. }) => { - let response = serde_json::from_slice::(&body)?; - - match response { - SqliteResponse::Ok => Ok(()), - SqliteResponse::Err(error) => Err(error.into()), - _ => Err(anyhow::anyhow!( - "sqlite: unexpected response {:?}", - response - )), - } - } - _ => Err(anyhow::anyhow!("sqlite: unexpected message: {:?}", res)), - } -} diff --git a/src/sqlite/sqlite_async.rs b/src/sqlite/sqlite_async.rs new file mode 100644 index 0000000..8caef73 --- /dev/null +++ b/src/sqlite/sqlite_async.rs @@ -0,0 +1,175 @@ +use crate::{ + get_blob, hyperapp, + sqlite::{Sqlite, SqliteAction, SqliteError, SqliteRequest, SqliteResponse}, + PackageId, Request, +}; +use std::collections::HashMap; + +impl Sqlite { + /// Query database. Only allows sqlite read keywords. + pub async fn read( + &self, + query: String, + params: Vec, + ) -> anyhow::Result>> { + let request = Request::new() + .target(("our", "sqlite", "distro", "sys")) + .body(serde_json::to_vec(&SqliteRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: SqliteAction::Query(query), + })?) + .blob_bytes(serde_json::to_vec(¶ms)?) + .expects_response(self.timeout); + + let response = hyperapp::send::(request).await?; + + match response { + SqliteResponse::Read => { + let blob = get_blob().ok_or_else(|| SqliteError::MalformedRequest)?; + let values = + serde_json::from_slice::>>(&blob.bytes) + .map_err(|_| SqliteError::MalformedRequest)?; + Ok(values) + } + SqliteResponse::Err(error) => Err(error.into()), + _ => Err(anyhow::anyhow!( + "sqlite: unexpected response {:?}", + response + )), + } + } + + /// Execute a statement. Only allows sqlite write keywords. + pub async fn write( + &self, + statement: String, + params: Vec, + tx_id: Option, + ) -> anyhow::Result<()> { + let request = Request::new() + .target(("our", "sqlite", "distro", "sys")) + .body(serde_json::to_vec(&SqliteRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: SqliteAction::Write { statement, tx_id }, + })?) + .blob_bytes(serde_json::to_vec(¶ms)?) + .expects_response(self.timeout); + + let response = hyperapp::send::(request).await?; + + match response { + SqliteResponse::Ok => Ok(()), + SqliteResponse::Err(error) => Err(error.into()), + _ => Err(anyhow::anyhow!( + "sqlite: unexpected response {:?}", + response + )), + } + } + + /// Begin a transaction. + pub async fn begin_tx(&self) -> anyhow::Result { + let request = Request::new() + .target(("our", "sqlite", "distro", "sys")) + .body(serde_json::to_vec(&SqliteRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: SqliteAction::BeginTx, + })?) + .expects_response(self.timeout); + + let response = hyperapp::send::(request).await?; + + match response { + SqliteResponse::BeginTx { tx_id } => Ok(tx_id), + SqliteResponse::Err(error) => Err(error.into()), + _ => Err(anyhow::anyhow!( + "sqlite: unexpected response {:?}", + response + )), + } + } + + /// Commit a transaction. + pub async fn commit_tx(&self, tx_id: u64) -> anyhow::Result<()> { + let request = Request::new() + .target(("our", "sqlite", "distro", "sys")) + .body(serde_json::to_vec(&SqliteRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: SqliteAction::Commit { tx_id }, + })?) + .expects_response(self.timeout); + + let response = hyperapp::send::(request).await?; + + match response { + SqliteResponse::Ok => Ok(()), + SqliteResponse::Err(error) => Err(error.into()), + _ => Err(anyhow::anyhow!( + "sqlite: unexpected response {:?}", + response + )), + } + } +} + +/// Open or create sqlite database. +pub async fn open(package_id: PackageId, db: &str, timeout: Option) -> anyhow::Result { + let timeout = timeout.unwrap_or(5); + + let request = Request::new() + .target(("our", "sqlite", "distro", "sys")) + .body(serde_json::to_vec(&SqliteRequest { + package_id: package_id.clone(), + db: db.to_string(), + action: SqliteAction::Open, + })?) + .expects_response(timeout); + + let response = hyperapp::send::(request).await?; + + match response { + SqliteResponse::Ok => Ok(Sqlite { + package_id, + db: db.to_string(), + timeout, + }), + SqliteResponse::Err(error) => Err(error.into()), + _ => Err(anyhow::anyhow!( + "sqlite: unexpected response {:?}", + response + )), + } +} + +/// Remove and delete sqlite database. +pub async fn remove_db( + package_id: PackageId, + db: &str, + timeout: Option, +) -> anyhow::Result<()> { + let timeout = timeout.unwrap_or(5); + + let request = Request::new() + .target(("our", "sqlite", "distro", "sys")) + .body(serde_json::to_vec(&SqliteRequest { + package_id: package_id.clone(), + db: db.to_string(), + action: SqliteAction::RemoveDb, + })?) + .expects_response(timeout); + + let response = hyperapp::send::(request).await?; + + match response { + SqliteResponse::Ok => Ok(()), + SqliteResponse::Err(error) => Err(error.into()), + _ => Err(anyhow::anyhow!( + "sqlite: unexpected response {:?}", + response + )), + } +} diff --git a/src/sqlite/sqlite_sync.rs b/src/sqlite/sqlite_sync.rs new file mode 100644 index 0000000..303bbaa --- /dev/null +++ b/src/sqlite/sqlite_sync.rs @@ -0,0 +1,202 @@ +use crate::{ + get_blob, + sqlite::{Sqlite, SqliteAction, SqliteError, SqliteRequest, SqliteResponse}, + Message, PackageId, Request, +}; +use std::collections::HashMap; + +impl Sqlite { + /// Query database. Only allows sqlite read keywords. + pub fn read( + &self, + query: String, + params: Vec, + ) -> anyhow::Result>> { + let res = Request::new() + .target(("our", "sqlite", "distro", "sys")) + .body(serde_json::to_vec(&SqliteRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: SqliteAction::Query(query), + })?) + .blob_bytes(serde_json::to_vec(¶ms)?) + .send_and_await_response(self.timeout)?; + + match res { + Ok(Message::Response { body, .. }) => { + let response = serde_json::from_slice::(&body)?; + + match response { + SqliteResponse::Read => { + let blob = get_blob().ok_or_else(|| SqliteError::MalformedRequest)?; + let values = serde_json::from_slice::< + Vec>, + >(&blob.bytes) + .map_err(|_| SqliteError::MalformedRequest)?; + Ok(values) + } + SqliteResponse::Err(error) => Err(error.into()), + _ => Err(anyhow::anyhow!( + "sqlite: unexpected response {:?}", + response + )), + } + } + _ => Err(anyhow::anyhow!("sqlite: unexpected message: {:?}", res)), + } + } + + /// Execute a statement. Only allows sqlite write keywords. + pub fn write( + &self, + statement: String, + params: Vec, + tx_id: Option, + ) -> anyhow::Result<()> { + let res = Request::new() + .target(("our", "sqlite", "distro", "sys")) + .body(serde_json::to_vec(&SqliteRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: SqliteAction::Write { statement, tx_id }, + })?) + .blob_bytes(serde_json::to_vec(¶ms)?) + .send_and_await_response(self.timeout)?; + + match res { + Ok(Message::Response { body, .. }) => { + let response = serde_json::from_slice::(&body)?; + + match response { + SqliteResponse::Ok => Ok(()), + SqliteResponse::Err(error) => Err(error.into()), + _ => Err(anyhow::anyhow!( + "sqlite: unexpected response {:?}", + response + )), + } + } + _ => Err(anyhow::anyhow!("sqlite: unexpected message: {:?}", res)), + } + } + + /// Begin a transaction. + pub fn begin_tx(&self) -> anyhow::Result { + let res = Request::new() + .target(("our", "sqlite", "distro", "sys")) + .body(serde_json::to_vec(&SqliteRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: SqliteAction::BeginTx, + })?) + .send_and_await_response(self.timeout)?; + + match res { + Ok(Message::Response { body, .. }) => { + let response = serde_json::from_slice::(&body)?; + + match response { + SqliteResponse::BeginTx { tx_id } => Ok(tx_id), + SqliteResponse::Err(error) => Err(error.into()), + _ => Err(anyhow::anyhow!( + "sqlite: unexpected response {:?}", + response + )), + } + } + _ => Err(anyhow::anyhow!("sqlite: unexpected message: {:?}", res)), + } + } + + /// Commit a transaction. + pub fn commit_tx(&self, tx_id: u64) -> anyhow::Result<()> { + let res = Request::new() + .target(("our", "sqlite", "distro", "sys")) + .body(serde_json::to_vec(&SqliteRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: SqliteAction::Commit { tx_id }, + })?) + .send_and_await_response(self.timeout)?; + + match res { + Ok(Message::Response { body, .. }) => { + let response = serde_json::from_slice::(&body)?; + + match response { + SqliteResponse::Ok => Ok(()), + SqliteResponse::Err(error) => Err(error.into()), + _ => Err(anyhow::anyhow!( + "sqlite: unexpected response {:?}", + response + )), + } + } + _ => Err(anyhow::anyhow!("sqlite: unexpected message: {:?}", res)), + } + } +} + +/// Open or create sqlite database. +pub fn open(package_id: PackageId, db: &str, timeout: Option) -> anyhow::Result { + let timeout = timeout.unwrap_or(5); + + let res = Request::new() + .target(("our", "sqlite", "distro", "sys")) + .body(serde_json::to_vec(&SqliteRequest { + package_id: package_id.clone(), + db: db.to_string(), + action: SqliteAction::Open, + })?) + .send_and_await_response(timeout)?; + + match res { + Ok(Message::Response { body, .. }) => { + let response = serde_json::from_slice::(&body)?; + + match response { + SqliteResponse::Ok => Ok(Sqlite { + package_id, + db: db.to_string(), + timeout, + }), + SqliteResponse::Err(error) => Err(error.into()), + _ => Err(anyhow::anyhow!( + "sqlite: unexpected response {:?}", + response + )), + } + } + _ => Err(anyhow::anyhow!("sqlite: unexpected message: {:?}", res)), + } +} + +/// Remove and delete sqlite database. +pub fn remove_db(package_id: PackageId, db: &str, timeout: Option) -> anyhow::Result<()> { + let timeout = timeout.unwrap_or(5); + + let res = Request::new() + .target(("our", "sqlite", "distro", "sys")) + .body(serde_json::to_vec(&SqliteRequest { + package_id: package_id.clone(), + db: db.to_string(), + action: SqliteAction::RemoveDb, + })?) + .send_and_await_response(timeout)?; + + match res { + Ok(Message::Response { body, .. }) => { + let response = serde_json::from_slice::(&body)?; + + match response { + SqliteResponse::Ok => Ok(()), + SqliteResponse::Err(error) => Err(error.into()), + _ => Err(anyhow::anyhow!( + "sqlite: unexpected response {:?}", + response + )), + } + } + _ => Err(anyhow::anyhow!("sqlite: unexpected message: {:?}", res)), + } +} diff --git a/src/timer.rs b/src/timer.rs index a946800..574095a 100644 --- a/src/timer.rs +++ b/src/timer.rs @@ -1,6 +1,11 @@ -use crate::{Context, Message, Request, SendError}; +use crate::{Context, Request}; +#[cfg(not(feature = "hyperapp"))] +use crate::{Message, SendError}; use serde::{Deserialize, Serialize}; +#[cfg(feature = "hyperapp")] +use crate::hyperapp; + /// The [`Request::body()`] field for requests to `timer:distro:sys`, a runtime module /// that allows processes to set timers with a duration specified in milliseconds. /// @@ -33,6 +38,7 @@ pub fn set_timer(duration: u64, context: Option) { /// Set a timer using the runtime that will return a [`crate::Response`] after the specified duration, /// then wait for that timer to resolve. The duration should be a number of milliseconds. +#[cfg(not(feature = "hyperapp"))] pub fn set_and_await_timer(duration: u64) -> Result { Request::to(("our", "timer", "distro", "sys")) .body(TimerAction::SetTimer(duration)) @@ -40,3 +46,10 @@ pub fn set_and_await_timer(duration: u64) -> Result { // safe to unwrap this call when we know we've set both target and body .unwrap() } + +/// Set a timer using the runtime that will return a [`crate::Response`] after the specified duration, +/// then wait for that timer to resolve. The duration should be a number of milliseconds. +#[cfg(feature = "hyperapp")] +pub async fn set_and_await_timer(duration: u64) -> Result<(), hyperapp::AppSendError> { + hyperapp::sleep(duration).await +} diff --git a/src/vfs/directory.rs b/src/vfs/directory.rs index ceebbf9..e23cec1 100644 --- a/src/vfs/directory.rs +++ b/src/vfs/directory.rs @@ -1,5 +1,8 @@ use super::{parse_response, vfs_request, DirEntry, FileType, VfsAction, VfsError, VfsResponse}; +#[cfg(feature = "hyperapp")] +pub mod directory_async; + /// VFS (Virtual File System) helper struct for a directory. /// Opening or creating a directory will give you a `Result`. /// You can call it's impl functions to interact with it. @@ -79,6 +82,7 @@ pub fn open_dir(path: &str, create: bool, timeout: Option) -> Result) -> Result<(), VfsError> { let timeout = timeout.unwrap_or(5); @@ -96,3 +100,9 @@ pub fn remove_dir(path: &str, timeout: Option) -> Result<(), VfsError> { }), } } + +/// Removes a dir at path, errors if path not found or path is not a `Directory`. +#[cfg(feature = "hyperapp")] +pub async fn remove_dir(path: &str, timeout: Option) -> Result<(), VfsError> { + directory_async::remove_dir_async(path, timeout).await +} diff --git a/src/vfs/directory/directory_async.rs b/src/vfs/directory/directory_async.rs new file mode 100644 index 0000000..3bc54d6 --- /dev/null +++ b/src/vfs/directory/directory_async.rs @@ -0,0 +1,24 @@ +use crate::{ + hyperapp, + vfs::{vfs_request, VfsAction, VfsError, VfsResponse}, +}; + +/// Removes a dir at path, errors if path not found or path is not a `Directory`. +pub async fn remove_dir_async(path: &str, timeout: Option) -> Result<(), VfsError> { + let timeout = timeout.unwrap_or(5); + + let request = vfs_request(path, VfsAction::RemoveDir).expects_response(timeout); + + let response = hyperapp::send::(request) + .await + .map_err(|_| VfsError::SendError(crate::SendErrorKind::Timeout))?; + + match response { + VfsResponse::Ok => Ok(()), + VfsResponse::Err(e) => Err(e), + _ => Err(VfsError::ParseError { + error: "unexpected response".to_string(), + path: path.to_string(), + }), + } +} diff --git a/src/vfs/directory_async.rs b/src/vfs/directory_async.rs new file mode 100644 index 0000000..0540f84 --- /dev/null +++ b/src/vfs/directory_async.rs @@ -0,0 +1,100 @@ +use super::{parse_response, vfs_request, DirEntry, FileType, VfsAction, VfsError, VfsResponse}; +use crate::hyperapp; + +pub struct DirectoryAsync { + pub path: String, + pub timeout: u64, +} + +impl DirectoryAsync { + pub async fn read(&self) -> Result, VfsError> { + let request = vfs_request(&self.path, VfsAction::ReadDir) + .expects_response(self.timeout); + + let resp_bytes = hyperapp::send_rmp::>(request) + .await + .map_err(|_| VfsError::SendError(crate::SendErrorKind::Timeout))?; + + match parse_response(&resp_bytes)? { + VfsResponse::ReadDir(entries) => Ok(entries), + VfsResponse::Err(e) => Err(e), + _ => Err(VfsError::ParseError { + error: "unexpected response".to_string(), + path: self.path.clone(), + }), + } + } +} + +pub async fn open_dir_async(path: &str, create: bool, timeout: Option) -> Result { + let timeout = timeout.unwrap_or(5); + if !create { + let request = vfs_request(path, VfsAction::Metadata) + .expects_response(timeout); + + let resp_bytes = hyperapp::send_rmp::>(request) + .await + .map_err(|_| VfsError::SendError(crate::SendErrorKind::Timeout))?; + + match parse_response(&resp_bytes)? { + VfsResponse::Metadata(m) => { + if m.file_type != FileType::Directory { + return Err(VfsError::IOError( + "entry at path is not a directory".to_string(), + )); + } + } + VfsResponse::Err(e) => return Err(e), + _ => { + return Err(VfsError::ParseError { + error: "unexpected response".to_string(), + path: path.to_string(), + }) + } + } + + return Ok(DirectoryAsync { + path: path.to_string(), + timeout, + }); + } + + let request = vfs_request(path, VfsAction::CreateDirAll) + .expects_response(timeout); + + let resp_bytes = hyperapp::send_rmp::>(request) + .await + .map_err(|_| VfsError::SendError(crate::SendErrorKind::Timeout))?; + + match parse_response(&resp_bytes)? { + VfsResponse::Ok => Ok(DirectoryAsync { + path: path.to_string(), + timeout, + }), + VfsResponse::Err(e) => Err(e), + _ => Err(VfsError::ParseError { + error: "unexpected response".to_string(), + path: path.to_string(), + }), + } +} + +pub async fn remove_dir_async(path: &str, timeout: Option) -> Result<(), VfsError> { + let timeout = timeout.unwrap_or(5); + + let request = vfs_request(path, VfsAction::RemoveDir) + .expects_response(timeout); + + let resp_bytes = hyperapp::send_rmp::>(request) + .await + .map_err(|_| VfsError::SendError(crate::SendErrorKind::Timeout))?; + + match parse_response(&resp_bytes)? { + VfsResponse::Ok => Ok(()), + VfsResponse::Err(e) => Err(e), + _ => Err(VfsError::ParseError { + error: "unexpected response".to_string(), + path: path.to_string(), + }), + } +} \ No newline at end of file diff --git a/src/vfs/file.rs b/src/vfs/file.rs index 4a2b716..e0d6cb4 100644 --- a/src/vfs/file.rs +++ b/src/vfs/file.rs @@ -3,6 +3,9 @@ use super::{ }; use crate::{get_blob, PackageId}; +#[cfg(feature = "hyperapp")] +pub mod file_async; + /// VFS (Virtual File System) helper struct for a file. /// Opening or creating a `File` will give you a `Result`. /// You can call its impl functions to interact with it. @@ -366,6 +369,7 @@ pub fn create_file(path: &str, timeout: Option) -> Result { } /// Removes a file at path, errors if path not found or path is not a file. +#[cfg(not(feature = "hyperapp"))] pub fn remove_file(path: &str, timeout: Option) -> Result<(), VfsError> { let timeout = timeout.unwrap_or(5); @@ -383,3 +387,9 @@ pub fn remove_file(path: &str, timeout: Option) -> Result<(), VfsError> { }), } } + +/// Removes a file at path, errors if path not found or path is not a file. +#[cfg(feature = "hyperapp")] +pub async fn remove_file(path: &str, timeout: Option) -> Result<(), VfsError> { + file_async::remove_file_async(path, timeout).await +} diff --git a/src/vfs/file/file_async.rs b/src/vfs/file/file_async.rs new file mode 100644 index 0000000..2d361c0 --- /dev/null +++ b/src/vfs/file/file_async.rs @@ -0,0 +1,24 @@ +use crate::{ + hyperapp, + vfs::{vfs_request, VfsAction, VfsError, VfsResponse}, +}; + +/// Removes a file at path, errors if path not found or path is not a file. +pub async fn remove_file_async(path: &str, timeout: Option) -> Result<(), VfsError> { + let timeout = timeout.unwrap_or(5); + + let request = vfs_request(path, VfsAction::RemoveFile).expects_response(timeout); + + let response = hyperapp::send::(request) + .await + .map_err(|_| VfsError::SendError(crate::SendErrorKind::Timeout))?; + + match response { + VfsResponse::Ok => Ok(()), + VfsResponse::Err(e) => Err(e.into()), + _ => Err(VfsError::ParseError { + error: "unexpected response".to_string(), + path: path.to_string(), + }), + } +} diff --git a/src/vfs/file_async.rs b/src/vfs/file_async.rs new file mode 100644 index 0000000..94e92dd --- /dev/null +++ b/src/vfs/file_async.rs @@ -0,0 +1,388 @@ +use super::{ + parse_response, vfs_request, FileMetadata, SeekFrom, VfsAction, VfsError, VfsResponse, +}; +use crate::{get_blob, hyperapp, PackageId}; + +#[derive(serde::Deserialize, serde::Serialize)] +pub struct FileAsync { + pub path: String, + pub timeout: u64, +} + +impl FileAsync { + pub fn new>(path: T, timeout: u64) -> Self { + Self { + path: path.into(), + timeout, + } + } + + pub async fn read(&self) -> Result, VfsError> { + let request = vfs_request(&self.path, VfsAction::Read) + .expects_response(self.timeout); + + let resp_bytes = hyperapp::send_rmp::>(request) + .await + .map_err(|e| VfsError::SendError(crate::SendErrorKind::Timeout))?; + + match parse_response(&resp_bytes)? { + VfsResponse::Read => { + let data = match get_blob() { + Some(bytes) => bytes.bytes, + None => { + return Err(VfsError::ParseError { + error: "no blob".to_string(), + path: self.path.clone(), + }) + } + }; + Ok(data) + } + VfsResponse::Err(e) => Err(e.into()), + _ => Err(VfsError::ParseError { + error: "unexpected response".to_string(), + path: self.path.clone(), + }), + } + } + + pub async fn read_into(&self, buffer: &mut [u8]) -> Result { + let request = vfs_request(&self.path, VfsAction::Read) + .expects_response(self.timeout); + + let resp_bytes = hyperapp::send_rmp::>(request) + .await + .map_err(|e| VfsError::SendError(crate::SendErrorKind::Timeout))?; + + match parse_response(&resp_bytes)? { + VfsResponse::Read => { + let data = get_blob().unwrap_or_default().bytes; + let len = std::cmp::min(data.len(), buffer.len()); + buffer[..len].copy_from_slice(&data[..len]); + Ok(len) + } + VfsResponse::Err(e) => Err(e.into()), + _ => Err(VfsError::ParseError { + error: "unexpected response".to_string(), + path: self.path.clone(), + }), + } + } + + pub async fn read_at(&self, buffer: &mut [u8]) -> Result { + let length = buffer.len() as u64; + + let request = vfs_request(&self.path, VfsAction::ReadExact { length }) + .expects_response(self.timeout); + + let resp_bytes = hyperapp::send_rmp::>(request) + .await + .map_err(|e| VfsError::SendError(crate::SendErrorKind::Timeout))?; + + match parse_response(&resp_bytes)? { + VfsResponse::Read => { + let data = get_blob().unwrap_or_default().bytes; + let len = std::cmp::min(data.len(), buffer.len()); + buffer[..len].copy_from_slice(&data[..len]); + Ok(len) + } + VfsResponse::Err(e) => Err(e.into()), + _ => Err(VfsError::ParseError { + error: "unexpected response".to_string(), + path: self.path.clone(), + }), + } + } + + pub async fn read_to_end(&self) -> Result, VfsError> { + let request = vfs_request(&self.path, VfsAction::ReadToEnd) + .expects_response(self.timeout); + + let resp_bytes = hyperapp::send_rmp::>(request) + .await + .map_err(|e| VfsError::SendError(crate::SendErrorKind::Timeout))?; + + match parse_response(&resp_bytes)? { + VfsResponse::Read => Ok(get_blob().unwrap_or_default().bytes), + VfsResponse::Err(e) => Err(e), + _ => Err(VfsError::ParseError { + error: "unexpected response".to_string(), + path: self.path.clone(), + }), + } + } + + pub async fn read_to_string(&self) -> Result { + let request = vfs_request(&self.path, VfsAction::ReadToString) + .expects_response(self.timeout); + + let resp_bytes = hyperapp::send_rmp::>(request) + .await + .map_err(|e| VfsError::SendError(crate::SendErrorKind::Timeout))?; + + match parse_response(&resp_bytes)? { + VfsResponse::ReadToString(s) => Ok(s), + VfsResponse::Err(e) => Err(e), + _ => Err(VfsError::ParseError { + error: "unexpected response".to_string(), + path: self.path.clone(), + }), + } + } + + pub async fn write(&self, buffer: &[u8]) -> Result<(), VfsError> { + let request = vfs_request(&self.path, VfsAction::Write) + .blob_bytes(buffer) + .expects_response(self.timeout); + + let resp_bytes = hyperapp::send_rmp::>(request) + .await + .map_err(|e| VfsError::SendError(crate::SendErrorKind::Timeout))?; + + match parse_response(&resp_bytes)? { + VfsResponse::Ok => Ok(()), + VfsResponse::Err(e) => Err(e), + _ => Err(VfsError::ParseError { + error: "unexpected response".to_string(), + path: self.path.clone(), + }), + } + } + + pub async fn write_all(&mut self, buffer: &[u8]) -> Result<(), VfsError> { + let request = vfs_request(&self.path, VfsAction::WriteAll) + .blob_bytes(buffer) + .expects_response(self.timeout); + + let resp_bytes = hyperapp::send_rmp::>(request) + .await + .map_err(|e| VfsError::SendError(crate::SendErrorKind::Timeout))?; + + match parse_response(&resp_bytes)? { + VfsResponse::Ok => Ok(()), + VfsResponse::Err(e) => Err(e), + _ => Err(VfsError::ParseError { + error: "unexpected response".to_string(), + path: self.path.clone(), + }), + } + } + + pub async fn append(&mut self, buffer: &[u8]) -> Result<(), VfsError> { + let request = vfs_request(&self.path, VfsAction::Append) + .blob_bytes(buffer) + .expects_response(self.timeout); + + let resp_bytes = hyperapp::send_rmp::>(request) + .await + .map_err(|e| VfsError::SendError(crate::SendErrorKind::Timeout))?; + + match parse_response(&resp_bytes)? { + VfsResponse::Ok => Ok(()), + VfsResponse::Err(e) => Err(e), + _ => Err(VfsError::ParseError { + error: "unexpected response".to_string(), + path: self.path.clone(), + }), + } + } + + pub async fn seek(&mut self, pos: SeekFrom) -> Result { + let request = vfs_request(&self.path, VfsAction::Seek(pos)) + .expects_response(self.timeout); + + let resp_bytes = hyperapp::send_rmp::>(request) + .await + .map_err(|e| VfsError::SendError(crate::SendErrorKind::Timeout))?; + + match parse_response(&resp_bytes)? { + VfsResponse::SeekFrom { + new_offset: new_pos, + } => Ok(new_pos), + VfsResponse::Err(e) => Err(e), + _ => Err(VfsError::ParseError { + error: "unexpected response".to_string(), + path: self.path.clone(), + }), + } + } + + pub async fn copy(&mut self, path: &str) -> Result { + let request = vfs_request( + &self.path, + VfsAction::CopyFile { + new_path: path.to_string(), + }, + ) + .expects_response(self.timeout); + + let resp_bytes = hyperapp::send_rmp::>(request) + .await + .map_err(|e| VfsError::SendError(crate::SendErrorKind::Timeout))?; + + match parse_response(&resp_bytes)? { + VfsResponse::Ok => Ok(FileAsync { + path: path.to_string(), + timeout: self.timeout, + }), + VfsResponse::Err(e) => Err(e), + _ => Err(VfsError::ParseError { + error: "unexpected response".to_string(), + path: self.path.clone(), + }), + } + } + + pub async fn set_len(&mut self, size: u64) -> Result<(), VfsError> { + let request = vfs_request(&self.path, VfsAction::SetLen(size)) + .expects_response(self.timeout); + + let resp_bytes = hyperapp::send_rmp::>(request) + .await + .map_err(|e| VfsError::SendError(crate::SendErrorKind::Timeout))?; + + match parse_response(&resp_bytes)? { + VfsResponse::Ok => Ok(()), + VfsResponse::Err(e) => Err(e), + _ => Err(VfsError::ParseError { + error: "unexpected response".to_string(), + path: self.path.clone(), + }), + } + } + + pub async fn metadata(&self) -> Result { + let request = vfs_request(&self.path, VfsAction::Metadata) + .expects_response(self.timeout); + + let resp_bytes = hyperapp::send_rmp::>(request) + .await + .map_err(|e| VfsError::SendError(crate::SendErrorKind::Timeout))?; + + match parse_response(&resp_bytes)? { + VfsResponse::Metadata(metadata) => Ok(metadata), + VfsResponse::Err(e) => Err(e), + _ => Err(VfsError::ParseError { + error: "unexpected response".to_string(), + path: self.path.clone(), + }), + } + } + + pub async fn sync_all(&self) -> Result<(), VfsError> { + let request = vfs_request(&self.path, VfsAction::SyncAll) + .expects_response(self.timeout); + + let resp_bytes = hyperapp::send_rmp::>(request) + .await + .map_err(|e| VfsError::SendError(crate::SendErrorKind::Timeout))?; + + match parse_response(&resp_bytes)? { + VfsResponse::Ok => Ok(()), + VfsResponse::Err(e) => Err(e), + _ => Err(VfsError::ParseError { + error: "unexpected response".to_string(), + path: self.path.clone(), + }), + } + } +} + +impl Drop for FileAsync { + fn drop(&mut self) { + vfs_request(&self.path, VfsAction::CloseFile) + .send() + .unwrap(); + } +} + +pub async fn create_drive_async( + package_id: PackageId, + drive: &str, + timeout: Option, +) -> Result { + let timeout = timeout.unwrap_or(5); + let path = format!("/{}/{}", package_id, drive); + + let request = vfs_request(&path, VfsAction::CreateDrive) + .expects_response(timeout); + + let resp_bytes = hyperapp::send_rmp::>(request) + .await + .map_err(|e| VfsError::SendError(crate::SendErrorKind::Timeout))?; + + match parse_response(&resp_bytes)? { + VfsResponse::Ok => Ok(path), + VfsResponse::Err(e) => Err(e), + _ => Err(VfsError::ParseError { + error: "unexpected response".to_string(), + path, + }), + } +} + +pub async fn open_file_async(path: &str, create: bool, timeout: Option) -> Result { + let timeout = timeout.unwrap_or(5); + + let request = vfs_request(path, VfsAction::OpenFile { create }) + .expects_response(timeout); + + let resp_bytes = hyperapp::send_rmp::>(request) + .await + .map_err(|e| VfsError::SendError(crate::SendErrorKind::Timeout))?; + + match parse_response(&resp_bytes)? { + VfsResponse::Ok => Ok(FileAsync { + path: path.to_string(), + timeout, + }), + VfsResponse::Err(e) => Err(e), + _ => Err(VfsError::ParseError { + error: "unexpected response".to_string(), + path: path.to_string(), + }), + } +} + +pub async fn create_file_async(path: &str, timeout: Option) -> Result { + let timeout = timeout.unwrap_or(5); + + let request = vfs_request(path, VfsAction::CreateFile) + .expects_response(timeout); + + let resp_bytes = hyperapp::send_rmp::>(request) + .await + .map_err(|e| VfsError::SendError(crate::SendErrorKind::Timeout))?; + + match parse_response(&resp_bytes)? { + VfsResponse::Ok => Ok(FileAsync { + path: path.to_string(), + timeout, + }), + VfsResponse::Err(e) => Err(e), + _ => Err(VfsError::ParseError { + error: "unexpected response".to_string(), + path: path.to_string(), + }), + } +} + +pub async fn remove_file_async(path: &str, timeout: Option) -> Result<(), VfsError> { + let timeout = timeout.unwrap_or(5); + + let request = vfs_request(path, VfsAction::RemoveFile) + .expects_response(timeout); + + let resp_bytes = hyperapp::send_rmp::>(request) + .await + .map_err(|e| VfsError::SendError(crate::SendErrorKind::Timeout))?; + + match parse_response(&resp_bytes)? { + VfsResponse::Ok => Ok(()), + VfsResponse::Err(e) => Err(e.into()), + _ => Err(VfsError::ParseError { + error: "unexpected response".to_string(), + path: path.to_string(), + }), + } +} \ No newline at end of file diff --git a/src/vfs/mod.rs b/src/vfs/mod.rs index 0022731..069eb8a 100644 --- a/src/vfs/mod.rs +++ b/src/vfs/mod.rs @@ -124,6 +124,7 @@ where } /// Metadata of a path, returns file type and length. +#[cfg(not(feature = "hyperapp"))] pub fn metadata(path: &str, timeout: Option) -> Result { let timeout = timeout.unwrap_or(5); @@ -142,7 +143,29 @@ pub fn metadata(path: &str, timeout: Option) -> Result) -> Result { + let timeout = timeout.unwrap_or(5); + + let request = vfs_request(path, VfsAction::Metadata).expects_response(timeout); + + let response = crate::hyperapp::send::(request) + .await + .map_err(|_| VfsError::SendError(crate::SendErrorKind::Timeout))?; + + match response { + VfsResponse::Metadata(metadata) => Ok(metadata), + VfsResponse::Err(e) => Err(e), + _ => Err(VfsError::ParseError { + error: "unexpected response".to_string(), + path: path.to_string(), + }), + } +} + /// Removes a path, if it's either a directory or a file. +#[cfg(not(feature = "hyperapp"))] pub fn remove_path(path: &str, timeout: Option) -> Result<(), VfsError> { let meta = metadata(path, timeout)?; @@ -156,6 +179,21 @@ pub fn remove_path(path: &str, timeout: Option) -> Result<(), VfsError> { } } +/// Removes a path, if it's either a directory or a file. +#[cfg(feature = "hyperapp")] +pub async fn remove_path(path: &str, timeout: Option) -> Result<(), VfsError> { + let meta = metadata(path, timeout).await?; + + match meta.file_type { + FileType::Directory => directory::remove_dir(path, timeout).await, + FileType::File => file::remove_file(path, timeout).await, + _ => Err(VfsError::ParseError { + error: "path is not a file or directory".to_string(), + path: path.to_string(), + }), + } +} + pub fn parse_response(body: &[u8]) -> Result { serde_json::from_slice::(body).map_err(|_| VfsError::MalformedRequest) }