From 22260cd882b3734b435c1a09067deefe1824e173 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sat, 6 Jun 2020 03:03:13 +0200 Subject: [PATCH 1/2] Init multipart more multipart multipart parsing Fix Entry::from_file compiling impl From for Body more multipart progress... --- Cargo.toml | 3 + src/body.rs | 23 +++- src/lib.rs | 4 + src/multipart/entry.rs | 151 +++++++++++++++++++++++++++ src/multipart/mod.rs | 231 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 411 insertions(+), 1 deletion(-) create mode 100644 src/multipart/entry.rs create mode 100644 src/multipart/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 7085d811..8e2451df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,8 @@ anyhow = "1.0.26" # features: cookies cookie = { version = "0.14.0", features = ["percent-encode"], optional = true } +futures-core = "0.3.5" +futures-util = "0.3.7" infer = "0.2.3" pin-project-lite = "0.2.0" url = { version = "2.1.1", features = ["serde"] } @@ -47,6 +49,7 @@ serde_urlencoded = "0.7.0" rand = "0.7.3" serde_qs = "0.7.0" base64 = "0.13.0" +multipart = { version = "0.17.0", default-features = false, features = ["server"] } [dev-dependencies] http = "0.2.0" diff --git a/src/body.rs b/src/body.rs index 5817565c..ae297dc9 100644 --- a/src/body.rs +++ b/src/body.rs @@ -57,6 +57,7 @@ pin_project_lite::pin_project! { mime: Mime, length: Option, bytes_read: usize + pub(crate) file_name: Option, } } @@ -80,6 +81,7 @@ impl Body { mime: mime::BYTE_STREAM, length: Some(0), bytes_read: 0, + file_name: None, } } @@ -111,6 +113,7 @@ impl Body { mime: mime::BYTE_STREAM, length: len, bytes_read: 0, + file_name: None, } } @@ -155,6 +158,7 @@ impl Body { length: Some(bytes.len()), reader: Box::new(io::Cursor::new(bytes)), bytes_read: 0, + file_name: None, } } @@ -205,6 +209,7 @@ impl Body { length: Some(s.len()), reader: Box::new(io::Cursor::new(s.into_bytes())), bytes_read: 0, + file_name: None, } } @@ -251,6 +256,7 @@ impl Body { reader: Box::new(io::Cursor::new(bytes)), mime: mime::JSON, bytes_read: 0, + file_name: None, }; Ok(body) } @@ -316,6 +322,7 @@ impl Body { reader: Box::new(io::Cursor::new(bytes)), mime: mime::FORM, bytes_read: 0, + file_name: None, }; Ok(body) } @@ -370,7 +377,7 @@ impl Body { P: AsRef, { let path = path.as_ref(); - let mut file = async_std::fs::File::open(path).await?; + let mut file = async_std::fs::File::open(&path).await?; let len = file.metadata().await?.len(); // Look at magic bytes first, look at extension second, fall back to @@ -385,6 +392,7 @@ impl Body { length: Some(len as usize), reader: Box::new(io::BufReader::new(file)), bytes_read: 0, + file_name: Some(path.to_string_lossy().to_string()), }) } @@ -419,6 +427,19 @@ impl Body { pub fn set_mime(&mut self, mime: impl Into) { self.mime = mime.into(); } + + /// Get the file name of the `Body`, if it's set. + pub fn file_name(&self) -> Option<&str> { + self.file_name.as_deref() + } + + /// Set the file name of the `Body`. + pub fn set_file_name(&mut self, file_name: Option) + where + S: AsRef, + { + self.file_name = file_name.map(|v| v.as_ref().to_owned()); + } } impl Debug for Body { diff --git a/src/lib.rs b/src/lib.rs index 480ce106..b81228ae 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -131,6 +131,10 @@ pub mod trace; pub mod transfer; pub mod upgrade; +cfg_unstable! { + pub mod multipart; +} + mod body; mod error; mod extensions; diff --git a/src/multipart/entry.rs b/src/multipart/entry.rs new file mode 100644 index 00000000..d6a5a8f0 --- /dev/null +++ b/src/multipart/entry.rs @@ -0,0 +1,151 @@ +use crate::{Body, Mime}; + +use std::fmt::{self, Debug}; +// use std::path::Path; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use futures_lite::{io, prelude::*}; + +pin_project_lite::pin_project! { + /// A single multipart entry. + /// + /// Structurally Multipart entries are similar to `Body`. + pub struct Entry { + name: String, + body: Body, + } +} + +impl Entry { + /// Create a new `Entry`. + pub fn new(name: S, body: B) -> Self + where + S: AsRef, + B: Into, + { + Self { + name: name.as_ref().to_owned(), + body: body.into(), + } + } + + /// Create an empty `Entry`. + pub fn empty(name: S) -> Self + where + S: AsRef, + { + Self::new(name, Body::empty()) + } + + /// Create an `Entry` from a file. + #[cfg(all(feature = "async_std", not(target_os = "unknown")))] + pub async fn from_file(name: S, path: P) -> crate::Result + where + S: AsRef, + P: AsRef, + { + let body = Body::from_file(path).await?; + Ok(Self::new(name, body)) + } + + /// Get the entry name. + pub fn name(&self) -> &String { + &self.name + } + + /// Set the entry name. + pub fn set_name(&mut self, name: S) + where + S: AsRef, + { + self.name = name.as_ref().to_owned(); + } + + /// Returns the mime type of this Body. + pub fn mime(&self) -> &Mime { + self.body.mime() + } + + /// Sets the mime type of this Body. + pub fn set_mime(&mut self, mime: Mime) { + self.body.set_mime(mime) + } + + /// Get the file name of the entry, if it's set. + pub fn file_name(&self) -> Option<&str> { + self.body.file_name() + } + + /// Set the file name of the `Body`. + pub fn set_file_name

(&mut self, file_name: Option

) + where + P: AsRef, + { + self.body.set_file_name(file_name); + } +} + +impl Debug for Entry { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Entry") + .field("name", &self.name) + .field("body", &self.body) + .finish() + } +} + +impl AsyncRead for Entry { + #[allow(missing_doc_code_examples)] + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + Pin::new(&mut self.body).poll_read(cx, buf) + } +} + +impl AsyncBufRead for Entry { + #[allow(missing_doc_code_examples)] + #[allow(unused_mut)] + #[allow(unused_variables)] + fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // Pin::new(&mut self.body).poll_fill_buf(cx) + todo!("Pin::new(&mut self.body).poll_fill_buf(cx)") + } + + fn consume(mut self: Pin<&mut Self>, amt: usize) { + Pin::new(&mut self.body).consume(amt) + } +} + +impl AsRef for Entry { + fn as_ref(&self) -> &Body { + &self.body + } +} + +impl AsMut for Entry { + fn as_mut(&mut self) -> &mut Body { + &mut self.body + } +} + +impl Into for Entry { + fn into(self) -> Body { + self.body + } +} + +impl From for Entry { + fn from(body: Body) -> Self { + match body.file_name.clone() { + Some(name) => Self { body, name }, + None => Self { + body, + name: String::new(), + }, + } + } +} diff --git a/src/multipart/mod.rs b/src/multipart/mod.rs new file mode 100644 index 00000000..a62b847d --- /dev/null +++ b/src/multipart/mod.rs @@ -0,0 +1,231 @@ +//! Multipart/form-data types. +//! +//! # Examples +//! +//! Request: +//! ``` +//! use http_types::multipart::{Multipart, Entry}; +//! +//! let mut req = Request::new(Method::Get, "http://example.website"); +//! +//! let mut multi = Multipart::new(); +//! multi.push(Entry::new("description", "hello world")); +//! +//! let mut entry = Entry::from_file("my_file", Body::from_file("./cats.jpeg").await?); +//! entry.set_file_name("cats.jpeg"); +//! multi.push("myFile", Body::from_file("./cats.jpeg").await?); +//! +//! req.set_body(multi); +//! ``` +//! +//! Response: +//! +//! ``` +//! use http_types::multipart::{Multipart, Entry}; +//! let mut res = Response::new(200); // get this from somewhere +//! +//! let mut entries = res.body_multipart(); +//! while let Some(entry) = entries.await { +//! println!("name: {}", entry.name()); +//! println!("data: {}", entry.into_string()?); +//! } +//! ``` + +use std::io::{Cursor, Read}; +use std::task::Context; +use std::task::Poll; +use std::{fmt::Debug, pin::Pin, str::FromStr}; + +use futures_core::stream::Stream; +use futures_lite::{io, prelude::*}; +use futures_util::stream::TryStreamExt; +use multipart::server::Multipart as Parser; + +use crate::mime; +use crate::{format_err, Body, Mime, Status}; +pub use entry::Entry; + +mod entry; + +/// A multipart response body. +pub struct Multipart { + entries: Vec, + body: Option>>, +} + +impl Debug for Multipart { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Multipart").finish() + } +} + +impl Multipart { + /// Create a new instance of `Multipart`. + pub fn new() -> Self { + Self { + entries: vec![], + body: None, + } + } + + /// Parse a `Body` stream as a `Multipart` instance. + pub async fn from_req(req: &mut crate::Request) -> crate::Result { + let boundary = req + .content_type() + .map(|ct| ct.param("boundary").cloned()) + .flatten(); + + let boundary = match boundary { + Some(boundary) => boundary.as_str().to_owned(), + None => { + let mut err = + format_err!("Invalid `Content-Type` header. Expected a `boundary` param"); + err.set_status(400); + return Err(err); + } + }; + + // Not ideal, but done for now so we can avoid implementing all of Multipart ourselves for the time being. + let body = req.take_body().into_string().await?; + + let multipart = Parser::with_body(Cursor::new(body), boundary); + Ok(Self { + entries: vec![], + body: Some(multipart), + }) + } + + /// Add a new entry to the `Multipart` instance. + pub fn push(&mut self, entry: E) + where + E: Into, + { + self.entries.push(entry.into()); + // if let Some(entries) = self.entries.as_mut() { + // entries.push(entry.into()); + // } else { + // self.entries = Some(vec![entry.into()]); + // } + } +} + +impl Stream for Multipart { + type Item = crate::Result; + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { + let body = match self.body.as_mut() { + None => return Poll::Ready(None), + Some(body) => body, + }; + + match body.read_entry() { + Ok(Some(mut field)) => { + let mut body = vec![]; + field.data.read_to_end(&mut body).status(400)?; + + let mut entry = Entry::new(field.headers.name, body); + entry.set_file_name(field.headers.filename); + let mime = field + .headers + .content_type + .map(|ct| Mime::from_str(&ct.to_string())) + .transpose()?; + if let Some(mime) = mime { + entry.set_mime(mime); + } else { + // https://tools.ietf.org/html/rfc7578#section-4.4 + entry.set_mime(mime::PLAIN); + } + + Poll::Ready(Some(Ok(entry))) + } + Ok(None) => Poll::Ready(None), + Err(e) => { + let mut err = format_err!("Invalid multipart entry: {}", e); + err.set_status(400); + Poll::Ready(Some(Err(err))) + } + } + } +} + +// struct MultipartReader { +// entry_iter: Box>, +// } + +// impl From for MultipartReader { +// fn from(multipart: Multipart) -> Self { +// Self { +// entry_iter: Box::new(multipart.entries.into_iter()) +// } +// } +// } + +// impl AsyncRead for MultipartReader { +// #[allow(missing_doc_code_examples)] +// fn poll_read( +// mut self: Pin<&mut Self>, +// cx: &mut Context<'_>, +// buf: &mut [u8], +// ) -> Poll> { +// if let Some(entry) = self.entry_iter.next() { +// Pin::new(&mut entry).poll_read(cx, buf) +// } else { +// Poll::Ready() +// } +// } +// } + +// impl AsyncBufRead for MultipartReader { +// #[allow(missing_doc_code_examples)] +// fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { +// let this = self.project(); +// this.reader.poll_fill_buf(cx) +// } + +// fn consume(mut self: Pin<&mut Self>, amt: usize) { +// Pin::new(&mut self.reader).consume(amt) +// } +// } + +// We need AsRef<[u8]> on BufReader for TryStreamExt (into_async_read) so... wrap and patch it in ourselves, for now. +#[doc(hidden)] +#[derive(Debug)] +pub struct BufReader { + inner: io::BufReader, +} + +#[doc(hidden)] +impl BufReader { + #[allow(missing_doc_code_examples)] + #[doc(hidden)] + pub fn new(inner: R) -> Self { + Self { + inner: io::BufReader::new(inner), + } + } +} + +#[doc(hidden)] +impl AsRef<[u8]> for BufReader { + #[allow(missing_doc_code_examples)] + #[doc(hidden)] + fn as_ref(&self) -> &[u8] { + self.inner.buffer() + } +} + +impl From for Body { + fn from(multipart: Multipart) -> Self { + let stream = multipart.map(|maybe_entry| { + maybe_entry + .map(BufReader::new) + .map_err(|err| { + std::io::Error::new(std::io::ErrorKind::Other, err.to_string()) + }) + }); + let mut body = Body::from_reader(io::BufReader::new(stream.into_async_read()), None); + body.set_mime(mime::MULTIPART_FORM); + body + } +} From d39402588a7e487101279cb5a4731087029a423e Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Mon, 18 Jan 2021 18:35:16 +0100 Subject: [PATCH 2/2] progress on multipart --- Cargo.toml | 1 - src/body.rs | 2 +- src/lib.rs | 5 +-- src/multipart/mod.rs | 98 +++++++++++++------------------------------- src/utils/mod.rs | 13 ++++++ 5 files changed, 44 insertions(+), 75 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8e2451df..ebfbfca1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,7 +39,6 @@ anyhow = "1.0.26" # features: cookies cookie = { version = "0.14.0", features = ["percent-encode"], optional = true } futures-core = "0.3.5" -futures-util = "0.3.7" infer = "0.2.3" pin-project-lite = "0.2.0" url = { version = "2.1.1", features = ["serde"] } diff --git a/src/body.rs b/src/body.rs index ae297dc9..ff4bfc10 100644 --- a/src/body.rs +++ b/src/body.rs @@ -56,7 +56,7 @@ pin_project_lite::pin_project! { reader: Box, mime: Mime, length: Option, - bytes_read: usize + bytes_read: usize, pub(crate) file_name: Option, } } diff --git a/src/lib.rs b/src/lib.rs index b81228ae..24046842 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -124,6 +124,7 @@ pub mod conditional; pub mod content; pub mod headers; pub mod mime; +pub mod multipart; pub mod other; pub mod proxies; pub mod server; @@ -131,10 +132,6 @@ pub mod trace; pub mod transfer; pub mod upgrade; -cfg_unstable! { - pub mod multipart; -} - mod body; mod error; mod extensions; diff --git a/src/multipart/mod.rs b/src/multipart/mod.rs index a62b847d..08dc106c 100644 --- a/src/multipart/mod.rs +++ b/src/multipart/mod.rs @@ -1,8 +1,15 @@ //! Multipart/form-data types. //! +//! # Specifications +//! +//! [RFC 2046, section 5.1: Multipart Media Type](https://tools.ietf.org/html/rfc2046#section-5.1) +//! [RFC 2388: Returning Values from Forms: multipart/form-data](https://tools.ietf.org/html/rfc2388) +//! [RFC 7578: Returning Values from Forms: multipart/form-data](https://tools.ietf.org/html/rfc7578) +//! //! # Examples //! //! Request: +//! //! ``` //! use http_types::multipart::{Multipart, Entry}; //! @@ -27,7 +34,7 @@ //! let mut entries = res.body_multipart(); //! while let Some(entry) = entries.await { //! println!("name: {}", entry.name()); -//! println!("data: {}", entry.into_string()?); +//! println!("data: {}", entry.into_string().await?); //! } //! ``` @@ -38,7 +45,6 @@ use std::{fmt::Debug, pin::Pin, str::FromStr}; use futures_core::stream::Stream; use futures_lite::{io, prelude::*}; -use futures_util::stream::TryStreamExt; use multipart::server::Multipart as Parser; use crate::mime; @@ -133,7 +139,9 @@ impl Stream for Multipart { if let Some(mime) = mime { entry.set_mime(mime); } else { - // https://tools.ietf.org/html/rfc7578#section-4.4 + // Each part MAY have an (optional) "Content-Type" header + // field, which defaults to "text/plain". + // src: https://tools.ietf.org/html/rfc7578#section-4.4 entry.set_mime(mime::PLAIN); } @@ -149,83 +157,35 @@ impl Stream for Multipart { } } -// struct MultipartReader { -// entry_iter: Box>, -// } - -// impl From for MultipartReader { -// fn from(multipart: Multipart) -> Self { -// Self { -// entry_iter: Box::new(multipart.entries.into_iter()) -// } -// } -// } - -// impl AsyncRead for MultipartReader { -// #[allow(missing_doc_code_examples)] -// fn poll_read( -// mut self: Pin<&mut Self>, -// cx: &mut Context<'_>, -// buf: &mut [u8], -// ) -> Poll> { -// if let Some(entry) = self.entry_iter.next() { -// Pin::new(&mut entry).poll_read(cx, buf) -// } else { -// Poll::Ready() -// } -// } -// } - -// impl AsyncBufRead for MultipartReader { -// #[allow(missing_doc_code_examples)] -// fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { -// let this = self.project(); -// this.reader.poll_fill_buf(cx) -// } - -// fn consume(mut self: Pin<&mut Self>, amt: usize) { -// Pin::new(&mut self.reader).consume(amt) -// } -// } - -// We need AsRef<[u8]> on BufReader for TryStreamExt (into_async_read) so... wrap and patch it in ourselves, for now. -#[doc(hidden)] -#[derive(Debug)] -pub struct BufReader { - inner: io::BufReader, +struct MultipartReader { + entry_iter: Box>, } -#[doc(hidden)] -impl BufReader { - #[allow(missing_doc_code_examples)] - #[doc(hidden)] - pub fn new(inner: R) -> Self { +impl From for MultipartReader { + fn from(multipart: Multipart) -> Self { Self { - inner: io::BufReader::new(inner), + entry_iter: Box::new(multipart.entries.into_iter()), } } } -#[doc(hidden)] -impl AsRef<[u8]> for BufReader { +impl AsyncRead for MultipartReader { #[allow(missing_doc_code_examples)] - #[doc(hidden)] - fn as_ref(&self) -> &[u8] { - self.inner.buffer() + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + if let Some(mut entry) = self.entry_iter.next() { + Pin::new(&mut entry).poll_read(cx, buf) + } else { + todo!(); + } } } impl From for Body { - fn from(multipart: Multipart) -> Self { - let stream = multipart.map(|maybe_entry| { - maybe_entry - .map(BufReader::new) - .map_err(|err| { - std::io::Error::new(std::io::ErrorKind::Other, err.to_string()) - }) - }); - let mut body = Body::from_reader(io::BufReader::new(stream.into_async_read()), None); - body.set_mime(mime::MULTIPART_FORM); - body + fn from(_multipart: Multipart) -> Self { + todo!(); } } diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 9c9ea923..c58730e5 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -40,3 +40,16 @@ pub(crate) fn sort_by_weight(props: &mut Vec) { }); *props = arr.into_iter().map(|(_, t)| t).collect::>(); } + +/// Declares unstable items. +#[allow(dead_code)] +#[doc(hidden)] +macro_rules! cfg_unstable { + ($($item:item)*) => { + $( + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + $item + )* + } +}