Skip to content

Commit a2d3e9e

Browse files
authored
Add ws_client types and functions (#24)
1 parent 412fbfe commit a2d3e9e

File tree

1 file changed

+202
-40
lines changed

1 file changed

+202
-40
lines changed

src/http.rs

Lines changed: 202 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
use crate::vfs::{FileType, VfsAction, VfsRequest, VfsResponse};
22
use crate::{
33
get_blob, Address, LazyLoadBlob as uqBlob, Message, ProcessId, Request as uqRequest,
4-
Response as uqResponse,
4+
Response as uqResponse, SendError,
55
};
66
pub use http::*;
77
use serde::{Deserialize, Serialize};
88
use std::collections::{HashMap, VecDeque};
99
use std::path::Path;
10+
use std::str::FromStr;
1011
use thiserror::Error;
1112

1213
//
@@ -47,18 +48,6 @@ pub struct IncomingHttpRequest {
4748
// BODY is stored in the lazy_load_blob, as bytes
4849
}
4950

50-
/// HTTP Request type that can be shared over WASM boundary to apps.
51-
/// This is the one you send to the `http_client:sys:nectar` service.
52-
#[derive(Debug, Serialize, Deserialize)]
53-
pub struct OutgoingHttpRequest {
54-
pub method: String, // must parse to http::Method
55-
pub version: Option<String>, // must parse to http::Version
56-
pub url: String, // must parse to url::Url
57-
pub headers: HashMap<String, String>,
58-
// BODY is stored in the lazy_load_blob, as bytes
59-
// TIMEOUT is stored in the message expect_response
60-
}
61-
6251
/// HTTP Response type that can be shared over WASM boundary to apps.
6352
/// Respond to [`IncomingHttpRequest`] with this type.
6453
#[derive(Debug, Serialize, Deserialize)]
@@ -68,20 +57,6 @@ pub struct HttpResponse {
6857
// BODY is stored in the lazy_load_blob, as bytes
6958
}
7059

71-
#[derive(Error, Debug, Serialize, Deserialize)]
72-
pub enum HttpClientError {
73-
#[error("http_client: request could not be parsed to HttpRequest: {}.", req)]
74-
BadRequest { req: String },
75-
#[error("http_client: http method not supported: {}", method)]
76-
BadMethod { method: String },
77-
#[error("http_client: url could not be parsed: {}", url)]
78-
BadUrl { url: String },
79-
#[error("http_client: http version not supported: {}", version)]
80-
BadVersion { version: String },
81-
#[error("http_client: failed to execute request {}", error)]
82-
RequestFailed { error: String },
83-
}
84-
8560
/// Request type sent to `http_server:sys:nectar` in order to configure it.
8661
/// You can also send [`type@HttpServerAction::WebSocketPush`], which
8762
/// allows you to push messages across an existing open WebSocket connection.
@@ -234,6 +209,81 @@ impl IncomingHttpRequest {
234209
}
235210
}
236211

212+
/// Request type that can be shared over WASM boundary to apps.
213+
/// This is the one you send to the `http_client:sys:nectar` service.
214+
#[derive(Debug, Serialize, Deserialize)]
215+
pub enum HttpClientAction {
216+
Http(OutgoingHttpRequest),
217+
WebSocketOpen {
218+
url: String,
219+
headers: HashMap<String, String>,
220+
channel_id: u32,
221+
},
222+
WebSocketPush {
223+
channel_id: u32,
224+
message_type: WsMessageType,
225+
},
226+
WebSocketClose {
227+
channel_id: u32,
228+
},
229+
}
230+
231+
/// HTTP Request type that can be shared over WASM boundary to apps.
232+
/// This is the one you send to the `http_client:sys:nectar` service.
233+
#[derive(Debug, Serialize, Deserialize)]
234+
pub struct OutgoingHttpRequest {
235+
pub method: String, // must parse to http::Method
236+
pub version: Option<String>, // must parse to http::Version
237+
pub url: String, // must parse to url::Url
238+
pub headers: HashMap<String, String>,
239+
// BODY is stored in the lazy_load_blob, as bytes
240+
// TIMEOUT is stored in the message expect_response
241+
}
242+
243+
/// WebSocket Client Request type that can be shared over WASM boundary to apps.
244+
/// This comes from an open websocket client connection in the `http_client:sys:nectar` service.
245+
#[derive(Debug, Serialize, Deserialize)]
246+
pub enum HttpClientRequest {
247+
WebSocketPush {
248+
channel_id: u32,
249+
message_type: WsMessageType,
250+
},
251+
WebSocketClose {
252+
channel_id: u32,
253+
},
254+
}
255+
256+
/// HTTP Client Response type that can be shared over WASM boundary to apps.
257+
/// This is the one you receive from the `http_client:sys:nectar` service.
258+
#[derive(Debug, Serialize, Deserialize)]
259+
pub enum HttpClientResponse {
260+
Http(HttpResponse),
261+
WebSocketAck,
262+
}
263+
264+
#[derive(Error, Debug, Serialize, Deserialize)]
265+
pub enum HttpClientError {
266+
// HTTP errors, may also be applicable to OutgoingWebSocketClientRequest::Open
267+
#[error("http_client: request is not valid HttpClientRequest: {}.", req)]
268+
BadRequest { req: String },
269+
#[error("http_client: http method not supported: {}", method)]
270+
BadMethod { method: String },
271+
#[error("http_client: url could not be parsed: {}", url)]
272+
BadUrl { url: String },
273+
#[error("http_client: http version not supported: {}", version)]
274+
BadVersion { version: String },
275+
#[error("http_client: failed to execute request {}", error)]
276+
RequestFailed { error: String },
277+
278+
// WebSocket errors
279+
#[error("websocket_client: failed to open connection {}", url)]
280+
WsOpenFailed { url: String },
281+
#[error("websocket_client: failed to send message {}", req)]
282+
WsPushFailed { req: String },
283+
#[error("websocket_client: failed to close connection {}", channel_id)]
284+
WsCloseFailed { channel_id: u32 },
285+
}
286+
237287
/// Register a new path with the HTTP server. This will cause the HTTP server to
238288
/// forward any requests on this path to the calling process. Requests will be
239289
/// given in the form of `Result<(), HttpServerError>`
@@ -342,12 +392,14 @@ pub fn send_request(
342392
) -> anyhow::Result<()> {
343393
let req = uqRequest::new()
344394
.target(("our", "http_client", "sys", "nectar"))
345-
.body(serde_json::to_vec(&OutgoingHttpRequest {
346-
method: method.to_string(),
347-
version: None,
348-
url: url.to_string(),
349-
headers: headers.unwrap_or_default(),
350-
})?)
395+
.body(serde_json::to_vec(&HttpClientAction::Http(
396+
OutgoingHttpRequest {
397+
method: method.to_string(),
398+
version: None,
399+
url: url.to_string(),
400+
headers: headers.unwrap_or_default(),
401+
},
402+
))?)
351403
.blob_bytes(body);
352404
if let Some(timeout) = timeout {
353405
req.expects_response(timeout).send()
@@ -363,16 +415,16 @@ pub fn send_request_await_response(
363415
headers: Option<HashMap<String, String>>,
364416
timeout: u64,
365417
body: Vec<u8>,
366-
) -> std::result::Result<HttpResponse, HttpClientError> {
418+
) -> std::result::Result<HttpClientResponse, HttpClientError> {
367419
let res = uqRequest::new()
368420
.target(("our", "http_client", "sys", "nectar"))
369421
.body(
370-
serde_json::to_vec(&OutgoingHttpRequest {
422+
serde_json::to_vec(&HttpClientAction::Http(OutgoingHttpRequest {
371423
method: method.to_string(),
372424
version: None,
373425
url: url.to_string(),
374426
headers: headers.unwrap_or_default(),
375-
})
427+
}))
376428
.map_err(|e| HttpClientError::BadRequest {
377429
req: format!("{e:?}"),
378430
})?,
@@ -383,11 +435,12 @@ pub fn send_request_await_response(
383435
error: e.to_string(),
384436
})?;
385437
match res {
386-
Ok(Message::Response { body, .. }) => {
387-
serde_json::from_slice(&body).map_err(|e| HttpClientError::RequestFailed {
438+
Ok(Message::Response { body, .. }) => match serde_json::from_slice(&body) {
439+
Ok(resp) => resp,
440+
Err(e) => Err(HttpClientError::RequestFailed {
388441
error: format!("http_client gave unparsable response: {e}"),
389-
})
390-
}
442+
}),
443+
},
391444
_ => Err(HttpClientError::RequestFailed {
392445
error: "http_client timed out".to_string(),
393446
}),
@@ -580,3 +633,112 @@ pub fn send_ws_push(
580633

581634
Ok(())
582635
}
636+
637+
pub fn open_ws_connection(
638+
node: String,
639+
url: String,
640+
headers: Option<HashMap<String, String>>,
641+
channel_id: u32,
642+
) -> anyhow::Result<()> {
643+
uqRequest::new()
644+
.target(Address::new(
645+
node,
646+
ProcessId::from_str("http_client:sys:nectar").unwrap(),
647+
))
648+
.body(
649+
serde_json::json!(HttpClientAction::WebSocketOpen {
650+
url,
651+
headers: headers.unwrap_or(HashMap::new()),
652+
channel_id,
653+
})
654+
.to_string()
655+
.as_bytes()
656+
.to_vec(),
657+
)
658+
.send()?;
659+
660+
Ok(())
661+
}
662+
663+
pub fn open_ws_connection_and_await(
664+
node: String,
665+
url: String,
666+
headers: Option<HashMap<String, String>>,
667+
channel_id: u32,
668+
) -> std::result::Result<std::result::Result<Message, SendError>, anyhow::Error> {
669+
uqRequest::new()
670+
.target(Address::new(
671+
node,
672+
ProcessId::from_str("http_client:sys:nectar").unwrap(),
673+
))
674+
.body(
675+
serde_json::json!(HttpClientAction::WebSocketOpen {
676+
url,
677+
headers: headers.unwrap_or(HashMap::new()),
678+
channel_id,
679+
})
680+
.to_string()
681+
.as_bytes()
682+
.to_vec(),
683+
)
684+
.send_and_await_response(5)
685+
}
686+
687+
pub fn send_ws_client_push(
688+
node: String,
689+
channel_id: u32,
690+
message_type: WsMessageType,
691+
blob: uqBlob,
692+
) -> std::result::Result<(), anyhow::Error> {
693+
uqRequest::new()
694+
.target(Address::new(
695+
node,
696+
ProcessId::from_str("http_client:sys:nectar").unwrap(),
697+
))
698+
.body(
699+
serde_json::json!(HttpClientAction::WebSocketPush {
700+
channel_id,
701+
message_type,
702+
})
703+
.to_string()
704+
.as_bytes()
705+
.to_vec(),
706+
)
707+
.blob(blob)
708+
.send()
709+
}
710+
711+
pub fn close_ws_connection(node: String, channel_id: u32) -> anyhow::Result<()> {
712+
uqRequest::new()
713+
.target(Address::new(
714+
node,
715+
ProcessId::from_str("http_client:sys:nectar").unwrap(),
716+
))
717+
.body(
718+
serde_json::json!(HttpClientAction::WebSocketClose { channel_id })
719+
.to_string()
720+
.as_bytes()
721+
.to_vec(),
722+
)
723+
.send()?;
724+
725+
Ok(())
726+
}
727+
728+
pub fn close_ws_connection_and_await(
729+
node: String,
730+
channel_id: u32,
731+
) -> std::result::Result<std::result::Result<Message, SendError>, anyhow::Error> {
732+
uqRequest::new()
733+
.target(Address::new(
734+
node,
735+
ProcessId::from_str("http_client:sys:nectar").unwrap(),
736+
))
737+
.body(
738+
serde_json::json!(HttpClientAction::WebSocketClose { channel_id })
739+
.to_string()
740+
.as_bytes()
741+
.to_vec(),
742+
)
743+
.send_and_await_response(5)
744+
}

0 commit comments

Comments
 (0)