From 613bf5fe32cfe0c54921ec79fffcdc891ef020c8 Mon Sep 17 00:00:00 2001 From: Matthias Date: Mon, 13 Sep 2021 19:57:54 +0200 Subject: [PATCH 01/35] Move to from vec to streams Previously we collected all inputs in one vector before checking the links, which is not ideal. Especially when reading many inputs (e.g. by using a glob pattern), this could cause issues like running out of file handles. By moving to streams we avoid that scenario. This is also the first step towards improving performance for many inputs. --- Cargo.lock | 39 +++++++++++++- lychee-lib/Cargo.toml | 3 ++ lychee-lib/src/collector.rs | 40 ++++++++------ lychee-lib/src/types/error.rs | 12 +++++ lychee-lib/src/types/file.rs | 7 ++- lychee-lib/src/types/input.rs | 98 ++++++++++++++++++++++------------- 6 files changed, 143 insertions(+), 56 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d13478f557..1efee23ef2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -253,6 +253,27 @@ dependencies = [ "trust-dns-resolver", ] +[[package]] +name = "async-stream" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "171374e7e3b2504e0e5236e3b59260560f9fe94bfe9ac39ba5e4e929c5590625" +dependencies = [ + "async-stream-impl", + "futures-core", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "648ed8c8d2ce5409ccd57453d9d1b214b342a0d69376a6feda1fd6cae3299308" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-task" version = "4.0.3" @@ -1397,10 +1418,12 @@ dependencies = [ name = "lychee-lib" version = "0.7.2" dependencies = [ + "async-stream", "check-if-email-exists", "deadpool", "doc-comment", "fast_chemail", + "futures-core", "glob", "html5ever", "http", @@ -1420,6 +1443,7 @@ dependencies = [ "shellexpand", "tempfile", "tokio", + "tokio-stream", "typed-builder", "url", "wiremock", @@ -2558,9 +2582,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.6.0" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd3076b5c8cc18138b8f8814895c11eb4de37114a5d127bafdc5e55798ceef37" +checksum = "b4efe6fc2395938c8155973d7be49fe8d03a843726e285e100a8a383cc0154ce" dependencies = [ "autocfg", "bytes", @@ -2609,6 +2633,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2f3f698253f03119ac0102beaa64f67a67e08074d03a22d18784104543727f" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.6.6" diff --git a/lychee-lib/Cargo.toml b/lychee-lib/Cargo.toml index e31bd4a8dd..4050f45ed1 100644 --- a/lychee-lib/Cargo.toml +++ b/lychee-lib/Cargo.toml @@ -43,6 +43,9 @@ url = { version = "2.2.2", features = ["serde"] } log = "0.4.14" path-clean = "0.1.0" percent-encoding = "2.1.0" +async-stream = "0.3.2" +futures-core = "0.3.17" +tokio-stream = "0.1.7" [dev-dependencies] doc-comment = "0.3.3" diff --git a/lychee-lib/src/collector.rs b/lychee-lib/src/collector.rs index b5e69d96ba..886b1a4f4f 100644 --- a/lychee-lib/src/collector.rs +++ b/lychee-lib/src/collector.rs @@ -1,5 +1,6 @@ use crate::{extract::extract_links, Base, Input, Request, Result, Uri}; use std::collections::HashSet; +use tokio_stream::StreamExt; /// Collector keeps the state of link collection #[derive(Debug, Clone)] @@ -37,10 +38,12 @@ impl Collector { let sender = contents_tx.clone(); let skip_missing_inputs = self.skip_missing_inputs; - tokio::spawn(async move { - let contents = input.get_contents(None, skip_missing_inputs).await; - sender.send(contents).await - }); + + let contents = input.get_contents(None, skip_missing_inputs).await; + tokio::pin!(contents); + while let Some(content) = contents.next().await { + sender.send(content?).await?; + } } // receiver will get None once all tasks are done @@ -49,13 +52,10 @@ impl Collector { // extract links from input contents let mut extract_links_handles = vec![]; - while let Some(result) = contents_rx.recv().await { - for input_content in result? { - let base = self.base.clone(); - let handle = - tokio::task::spawn_blocking(move || extract_links(&input_content, &base)); - extract_links_handles.push(handle); - } + while let Some(content) = contents_rx.recv().await { + let base = self.base.clone(); + let handle = tokio::task::spawn_blocking(move || extract_links(&content, &base)); + extract_links_handles.push(handle); } // Note: we could dispatch links to be checked as soon as we get them, @@ -89,6 +89,7 @@ mod test { use http::StatusCode; use pretty_assertions::assert_eq; use reqwest::Url; + use tokio_stream::StreamExt; use super::*; use crate::{ @@ -105,27 +106,34 @@ mod test { const TEST_GLOB_2_MAIL: &str = "test@glob-2.io"; #[tokio::test] - #[ignore] async fn test_file_without_extension_is_plaintext() -> Result<()> { let temp_dir = tempfile::tempdir()?; // Treat as plaintext file (no extension) let file_path = temp_dir.path().join("README"); let _file = File::create(&file_path)?; let input = Input::new(&file_path.as_path().display().to_string(), true); - let contents = input.get_contents(None, true).await?; + let contents: Vec<_> = input + .get_contents(None, true) + .await + .collect::>() + .await; assert_eq!(contents.len(), 1); - assert_eq!(contents[0].file_type, FileType::Plaintext); + assert_eq!(contents[0].as_ref().unwrap().file_type, FileType::Plaintext); Ok(()) } #[tokio::test] async fn test_url_without_extension_is_html() -> Result<()> { let input = Input::new("https://example.org/", true); - let contents = input.get_contents(None, true).await?; + let contents: Vec<_> = input + .get_contents(None, true) + .await + .collect::>() + .await; assert_eq!(contents.len(), 1); - assert_eq!(contents[0].file_type, FileType::Html); + assert_eq!(contents[0].as_ref().unwrap().file_type, FileType::Html); Ok(()) } diff --git a/lychee-lib/src/types/error.rs b/lychee-lib/src/types/error.rs index 4a761416d3..237cb38ca5 100644 --- a/lychee-lib/src/types/error.rs +++ b/lychee-lib/src/types/error.rs @@ -5,6 +5,8 @@ use serde::{Serialize, Serializer}; use crate::Uri; +use super::InputContent; + /// Kinds of status errors. #[allow(clippy::module_name_repetitions)] #[derive(Debug)] @@ -42,6 +44,8 @@ pub enum ErrorKind { MissingGitHubToken, /// The website is available in HTTPS protocol, but HTTP scheme is used. InsecureURL(Uri), + /// Error while sending/receiving messages from MPSC channel + ChannelError(tokio::sync::mpsc::error::SendError), } impl PartialEq for ErrorKind { @@ -85,6 +89,7 @@ impl Hash for ErrorKind { Self::InvalidHeader(e) => e.to_string().hash(state), Self::InvalidGlobPattern(e) => e.to_string().hash(state), Self::MissingGitHubToken => std::mem::discriminant(self).hash(state), + Self::ChannelError(e) => e.to_string().hash(state), } } } @@ -128,6 +133,7 @@ impl Display for ErrorKind { ), Self::InvalidBase(base, e) => write!(f, "Error with base dir `{}` : {}", base, e), Self::Utf8Error(e) => e.fmt(f), + Self::ChannelError(e) => e.fmt(f), } } } @@ -207,6 +213,12 @@ impl From for ErrorKind { } } +impl From> for ErrorKind { + fn from(e: tokio::sync::mpsc::error::SendError) -> Self { + Self::ChannelError(e) + } +} + impl From for ErrorKind { fn from(_: Infallible) -> Self { // tautological diff --git a/lychee-lib/src/types/file.rs b/lychee-lib/src/types/file.rs index d0d9510024..b080df7a57 100644 --- a/lychee-lib/src/types/file.rs +++ b/lychee-lib/src/types/file.rs @@ -28,9 +28,14 @@ impl> From

for FileType { // Unfortunately that's not possible without refactoring, as // `AsRef` could be implemented for `Url` in the future, which is why // `From for FileType` is not allowed. + // As a workaround, we check if we got a known web-protocol + let is_url = path.starts_with("http"); + match path.extension().and_then(std::ffi::OsStr::to_str) { Some("md" | "markdown") => FileType::Markdown, - Some("htm" | "html") | None => FileType::Html, + Some("htm" | "html") => FileType::Html, + None if is_url => FileType::Html, + None => FileType::Plaintext, Some(_) => FileType::Plaintext, } } diff --git a/lychee-lib/src/types/input.rs b/lychee-lib/src/types/input.rs index ad97355dd2..c13dc071a4 100644 --- a/lychee-lib/src/types/input.rs +++ b/lychee-lib/src/types/input.rs @@ -1,11 +1,13 @@ use crate::types::FileType; use crate::Result; +use async_stream::try_stream; +use futures_core::stream::Stream; use glob::glob_with; use reqwest::Url; use serde::Serialize; use shellexpand::tilde; +use std::fmt::Display; use std::path::{Path, PathBuf}; -use std::{fmt::Display, fs::read_to_string}; use tokio::io::{stdin, AsyncReadExt}; const STDIN: &str = "-"; @@ -112,24 +114,40 @@ impl Input { &self, file_type_hint: Option, skip_missing: bool, - ) -> Result> { - match *self { - // TODO: should skip_missing also affect URLs? - Input::RemoteUrl(ref url) => Ok(vec![Self::url_contents(url).await?]), - Input::FsGlob { - ref pattern, - ignore_case, - } => Ok(Self::glob_contents(pattern, ignore_case).await?), - Input::FsPath(ref path) => { - let content = Self::path_content(path); - match content { - Ok(input_content) => Ok(vec![input_content]), - Err(_) if skip_missing => Ok(vec![]), - Err(e) => Err(e), + ) -> impl Stream> + '_ { + try_stream! { + match *self { + // TODO: should skip_missing also affect URLs? + Input::RemoteUrl(ref url) => { + let contents: InputContent = Self::url_contents(url).await?; + yield contents; + }, + Input::FsGlob { + ref pattern, + ignore_case, + } => { + for await content in Self::glob_contents(pattern, ignore_case).await { + let content = content?; + yield content; + } } + Input::FsPath(ref path) => { + let content = Self::path_content(path).await; + match content { + Err(_) if skip_missing => (), + Err(e) => Err(e)?, + Ok(content) => yield content, + }; + }, + Input::Stdin => { + let contents = Self::stdin_content(file_type_hint).await?; + yield contents; + }, + Input::String(ref s) => { + let contents = Self::string_content(s, file_type_hint); + yield contents; + }, } - Input::Stdin => Ok(vec![Self::stdin_content(file_type_hint).await?]), - Input::String(ref s) => Ok(vec![Self::string_content(s, file_type_hint)]), } } @@ -151,40 +169,46 @@ impl Input { Ok(input_content) } - async fn glob_contents(path_glob: &str, ignore_case: bool) -> Result> { - let mut contents = vec![]; - let glob_expanded = tilde(&path_glob); + async fn glob_contents( + path_glob: &str, + ignore_case: bool, + ) -> impl Stream> + '_ { + let glob_expanded = tilde(&path_glob).to_string(); let mut match_opts = glob::MatchOptions::new(); match_opts.case_sensitive = !ignore_case; - for entry in glob_with(&glob_expanded, match_opts)? { - match entry { - Ok(path) => { - if path.is_dir() { - // Directories can still have a suffix which looks like - // a file extension like `foo.html`. This can lead to - // unexpected behavior with glob patterns like - // `**/*.html`. Therefore filter these out. - // https://github.com/lycheeverse/lychee/pull/262#issuecomment-913226819 - continue; + try_stream! { + for entry in glob_with(&glob_expanded, match_opts)? { + match entry { + Ok(path) => { + if path.is_dir() { + // Directories can still have a suffix which looks like + // a file extension like `foo.html`. This can lead to + // unexpected behavior with glob patterns like + // `**/*.html`. Therefore filter these out. + // https://github.com/lycheeverse/lychee/pull/262#issuecomment-913226819 + continue; + } + let content: InputContent = Self::path_content(&path).await?; + yield content; } - let content = Self::path_content(&path)?; - contents.push(content); + Err(e) => println!("{:?}", e), } - Err(e) => println!("{:?}", e), } } - - Ok(contents) } /// Get the input content of a given path /// # Errors /// /// Will return `Err` if file contents can't be read - pub fn path_content + AsRef + Clone>(path: P) -> Result { - let content = read_to_string(&path).map_err(|e| (path.clone().into(), e))?; + pub async fn path_content + AsRef + Clone>( + path: P, + ) -> Result { + let content = tokio::fs::read_to_string(&path) + .await + .map_err(|e| (path.clone().into(), e))?; let input_content = InputContent { file_type: FileType::from(path.as_ref()), content, From 0c05acb8af149b377cada8cb0f642fba65e82e2a Mon Sep 17 00:00:00 2001 From: Matthias Date: Tue, 14 Sep 2021 16:11:28 +0200 Subject: [PATCH 02/35] Fix formatting and lints --- lychee-lib/src/types/file.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/lychee-lib/src/types/file.rs b/lychee-lib/src/types/file.rs index b080df7a57..3d9d427d66 100644 --- a/lychee-lib/src/types/file.rs +++ b/lychee-lib/src/types/file.rs @@ -33,10 +33,9 @@ impl> From

for FileType { match path.extension().and_then(std::ffi::OsStr::to_str) { Some("md" | "markdown") => FileType::Markdown, - Some("htm" | "html") => FileType::Html, + Some("htm" | "html") => FileType::Html, None if is_url => FileType::Html, - None => FileType::Plaintext, - Some(_) => FileType::Plaintext, + _ => FileType::Plaintext, } } } From a05400b28bf13baf40cf0b6a3af3f34b29c0967b Mon Sep 17 00:00:00 2001 From: Timo Freiberg Date: Sun, 26 Sep 2021 15:23:28 +0200 Subject: [PATCH 03/35] Return collected links as Stream --- Cargo.lock | 3 + examples/collect_links/Cargo.toml | 3 +- examples/collect_links/collect_links.rs | 3 + lychee-bin/Cargo.toml | 2 + lychee-bin/src/main.rs | 6 +- lychee-lib/src/collector.rs | 83 ++++++++++++------------- 6 files changed, 54 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e01f03780d..8583c5dbd0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -497,6 +497,7 @@ dependencies = [ "regex", "reqwest", "tokio", + "tokio-stream", ] [[package]] @@ -1473,6 +1474,7 @@ dependencies = [ "assert_cmd", "console", "console-subscriber", + "futures", "headers", "http", "indicatif", @@ -1490,6 +1492,7 @@ dependencies = [ "structopt", "tempfile", "tokio", + "tokio-stream", "toml", "tracing-subscriber", "uuid", diff --git a/examples/collect_links/Cargo.toml b/examples/collect_links/Cargo.toml index 465fb99de4..defd3723ab 100644 --- a/examples/collect_links/Cargo.toml +++ b/examples/collect_links/Cargo.toml @@ -12,4 +12,5 @@ lychee-lib = { path = "../../lychee-lib", version = "0.7.2" } tokio = { version = "1.12.0", features = ["full"] } regex = "1.4.6" http = "0.2.5" -reqwest = { version = "0.11.3", features = ["gzip"] } \ No newline at end of file +reqwest = { version = "0.11.3", features = ["gzip"] } +tokio-stream = "0.1.7" diff --git a/examples/collect_links/collect_links.rs b/examples/collect_links/collect_links.rs index 60c37f9291..901f7d40e5 100644 --- a/examples/collect_links/collect_links.rs +++ b/examples/collect_links/collect_links.rs @@ -1,6 +1,7 @@ use lychee_lib::{Collector, Input, Result}; use reqwest::Url; use std::path::PathBuf; +use tokio_stream::StreamExt; #[tokio::main] #[allow(clippy::trivial_regex)] @@ -21,6 +22,8 @@ async fn main() -> Result<()> { .collect_links( inputs, // base url or directory ) + .await + .collect::>>() .await?; dbg!(links); diff --git a/lychee-bin/Cargo.toml b/lychee-bin/Cargo.toml index 208c5b5481..c7a5707a24 100644 --- a/lychee-bin/Cargo.toml +++ b/lychee-bin/Cargo.toml @@ -38,6 +38,8 @@ serde_json = "1.0.68" structopt = "0.3.21" tokio = { version = "1.12.0", features = ["full"] } toml = "0.5.8" +futures = "0.3.17" +tokio-stream = "0.1.7" [dev-dependencies] assert_cmd = "2.0.1" diff --git a/lychee-bin/src/main.rs b/lychee-bin/src/main.rs index c4f46737f3..564246f594 100644 --- a/lychee-bin/src/main.rs +++ b/lychee-bin/src/main.rs @@ -67,6 +67,7 @@ use std::iter::FromIterator; use std::{collections::HashSet, fs, str::FromStr, time::Duration}; use anyhow::{anyhow, Context, Result}; +use futures::stream::TryStreamExt; use headers::{authorization::Basic, Authorization, HeaderMap, HeaderMapExt, HeaderName}; use http::StatusCode; use indicatif::{ProgressBar, ProgressStyle}; @@ -76,6 +77,7 @@ use regex::RegexSet; use ring as _; // required for apple silicon use structopt::StructOpt; use tokio::sync::mpsc; +use tokio_stream::StreamExt; mod options; mod stats; @@ -209,7 +211,9 @@ async fn run(cfg: &Config, inputs: Vec) -> Result { let links = Collector::new(cfg.base.clone(), cfg.skip_missing, max_concurrency) .collect_links(&inputs) .await - .map_err(|e| anyhow!(e))?; + .map_err(|e| anyhow!(e)) + .collect::>>() + .await?; if cfg.dump { let exit_code = dump_links(links.iter().filter(|link| !client.filtered(&link.uri))); diff --git a/lychee-lib/src/collector.rs b/lychee-lib/src/collector.rs index 886b1a4f4f..47b49b4e42 100644 --- a/lychee-lib/src/collector.rs +++ b/lychee-lib/src/collector.rs @@ -1,4 +1,6 @@ use crate::{extract::extract_links, Base, Input, Request, Result, Uri}; +use async_stream::try_stream; +use futures_core::Stream; use std::collections::HashSet; use tokio_stream::StreamExt; @@ -30,55 +32,45 @@ impl Collector { /// # Errors /// /// Will return `Err` if links cannot be extracted from an input - pub async fn collect_links(mut self, inputs: &[Input]) -> Result> { - let (contents_tx, mut contents_rx) = tokio::sync::mpsc::channel(self.max_concurrency); + pub async fn collect_links(mut self, inputs: &[Input]) -> impl Stream> + '_ { + try_stream! { + let (contents_tx, mut contents_rx) = tokio::sync::mpsc::channel(self.max_concurrency); - // extract input contents - for input in inputs.iter().cloned() { - let sender = contents_tx.clone(); + // extract input contents + for input in inputs.iter().cloned() { + let sender = contents_tx.clone(); - let skip_missing_inputs = self.skip_missing_inputs; + let skip_missing_inputs = self.skip_missing_inputs; - let contents = input.get_contents(None, skip_missing_inputs).await; - tokio::pin!(contents); - while let Some(content) = contents.next().await { - sender.send(content?).await?; + let contents = input.get_contents(None, skip_missing_inputs).await; + tokio::pin!(contents); + while let Some(content) = contents.next().await { + sender.send(content?).await?; + } } - } - - // receiver will get None once all tasks are done - drop(contents_tx); - // extract links from input contents - let mut extract_links_handles = vec![]; + // receiver will get None once all tasks are done + drop(contents_tx); - while let Some(content) = contents_rx.recv().await { - let base = self.base.clone(); - let handle = tokio::task::spawn_blocking(move || extract_links(&content, &base)); - extract_links_handles.push(handle); - } + // extract links from input contents + let mut extract_links_handles = vec![]; - // Note: we could dispatch links to be checked as soon as we get them, - // instead of building a HashSet with all links. - // This optimization would speed up cases where there's - // a lot of inputs and/or the inputs are large (e.g. big files). - let mut links: HashSet = HashSet::new(); + while let Some(content) = contents_rx.recv().await { + let base = self.base.clone(); + let handle = tokio::task::spawn_blocking(move || extract_links(&content, &base)); + extract_links_handles.push(handle); + } - for handle in extract_links_handles { - let new_links = handle.await?; - links.extend(new_links?); + for handle in extract_links_handles { + let new_links = handle.await??; + for link in new_links { + if !self.cache.contains(&link.uri) { + self.cache.insert(link.uri.clone()); + yield link; + } + } + } } - - // Filter out already cached links (duplicates) - links.retain(|l| !self.cache.contains(&l.uri)); - - self.update_cache(&links); - Ok(links) - } - - /// Update internal link cache - fn update_cache(&mut self, links: &HashSet) { - self.cache.extend(links.iter().cloned().map(|l| l.uri)); } } @@ -168,10 +160,13 @@ mod test { }, ]; - let responses = Collector::new(None, false, 8) - .collect_links(&inputs) - .await?; - let mut links = responses.into_iter().map(|r| r.uri).collect::>(); + let responses = Collector::new(None, false, 8).collect_links(&inputs).await; + let mut links = responses + .collect::>>() + .await? + .into_iter() + .map(|r| r.uri) + .collect::>(); let mut expected_links = vec![ website(TEST_STRING), From d4b9badd44dfff0c7df08d892856e69396f3c731 Mon Sep 17 00:00:00 2001 From: Timo Freiberg Date: Wed, 6 Oct 2021 14:15:12 +0200 Subject: [PATCH 04/35] Initialize ProgressBar without length Because we can't know the amount of links without blocking --- lychee-bin/src/main.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/lychee-bin/src/main.rs b/lychee-bin/src/main.rs index 564246f594..386f9df3fe 100644 --- a/lychee-bin/src/main.rs +++ b/lychee-bin/src/main.rs @@ -223,10 +223,11 @@ async fn run(cfg: &Config, inputs: Vec) -> Result { let pb = if cfg.no_progress { None } else { - let bar = - ProgressBar::new(links.len() as u64).with_style(ProgressStyle::default_bar().template( - "{spinner:.red.bright} {pos}/{len:.dim} [{elapsed_precise}] {bar:25} {wide_msg}", - )); + let bar = ProgressBar::new_spinner().with_style(ProgressStyle::default_bar().template( + "{spinner:.red.bright} {pos}/{len:.dim} [{elapsed_precise}] {bar:25} {wide_msg}", + )); + bar.set_length(0); + bar.set_message("Extracting links"); bar.enable_steady_tick(100); Some(bar) }; From 98cdfba9585016205b8e9b718167b688ecfc56e2 Mon Sep 17 00:00:00 2001 From: Timo Freiberg Date: Wed, 6 Oct 2021 14:16:24 +0200 Subject: [PATCH 05/35] Handle stream results in main thread, not in task To stay as close to the pre-stream behaviour, we want to stop processing as soon as an Err value appears in the stream. This is easiest when the stream is consumed in the main thread. Previously, the stream was consumed in a tokio task and the main thread waited for responses. Now, a tokio task waits for responses (and displays them/registers response stats) and the main thread sends links to the ClientPool. To ensure that the main thread waits for all responses to have arrived before finishing the ProgressBar and printing the stats, it waits for the show_results_task to finish. --- lychee-bin/src/main.rs | 46 +++++++++++++++++++++++++++--------------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/lychee-bin/src/main.rs b/lychee-bin/src/main.rs index 386f9df3fe..caffaf5cce 100644 --- a/lychee-bin/src/main.rs +++ b/lychee-bin/src/main.rs @@ -67,7 +67,7 @@ use std::iter::FromIterator; use std::{collections::HashSet, fs, str::FromStr, time::Duration}; use anyhow::{anyhow, Context, Result}; -use futures::stream::TryStreamExt; +use futures::{pin_mut, stream::TryStreamExt}; use headers::{authorization::Basic, Authorization, HeaderMap, HeaderMapExt, HeaderName}; use http::StatusCode; use indicatif::{ProgressBar, ProgressStyle}; @@ -211,12 +211,16 @@ async fn run(cfg: &Config, inputs: Vec) -> Result { let links = Collector::new(cfg.base.clone(), cfg.skip_missing, max_concurrency) .collect_links(&inputs) .await - .map_err(|e| anyhow!(e)) - .collect::>>() - .await?; + .map_err(|e| anyhow!(e)); if cfg.dump { - let exit_code = dump_links(links.iter().filter(|link| !client.filtered(&link.uri))); + let exit_code = dump_links( + links + .collect::>>() + .await? + .iter() + .filter(|link| !client.filtered(&link.uri)), + ); return Ok(exit_code as i32); } @@ -238,14 +242,6 @@ async fn run(cfg: &Config, inputs: Vec) -> Result { let mut stats = ResponseStats::new(); let bar = pb.clone(); - tokio::spawn(async move { - for link in links { - if let Some(pb) = &bar { - pb.set_message(&link.to_string()); - }; - send_req.send(link).await.unwrap(); - } - }); // Start receiving requests tokio::spawn(async move { @@ -254,11 +250,29 @@ async fn run(cfg: &Config, inputs: Vec) -> Result { clients.listen().await; }); - while let Some(response) = recv_resp.recv().await { - show_progress(&pb, &response, cfg.verbose); - stats.add(response); + let show_results_task = tokio::spawn({ + let verbose = cfg.verbose; + async move { + while let Some(response) = recv_resp.recv().await { + show_progress(&pb, &response, verbose); + stats.add(response); + } + (pb, stats) + } + }); + + pin_mut!(links); + while let Some(link) = links.next().await { + let link = link?; + if let Some(pb) = &bar { + pb.inc_length(1); + pb.set_message(&link.to_string()); + }; + send_req.send(link).await.unwrap(); } + let (pb, stats) = show_results_task.await?; + // Note that print statements may interfere with the progress bar, so this // must go before printing the stats if let Some(pb) = &pb { From 1471725f52ad69d48ce90202f40564aa7abbd25e Mon Sep 17 00:00:00 2001 From: Matthias Date: Thu, 7 Oct 2021 18:39:39 +0200 Subject: [PATCH 06/35] Cleanup --- lychee-bin/src/main.rs | 22 ++++++++++++---------- lychee-lib/src/collector.rs | 4 ++-- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/lychee-bin/src/main.rs b/lychee-bin/src/main.rs index c41c79b4ff..bdb1b9e141 100644 --- a/lychee-bin/src/main.rs +++ b/lychee-bin/src/main.rs @@ -68,10 +68,9 @@ use std::{collections::HashSet, fs, str::FromStr}; use anyhow::{anyhow, Context, Result}; use futures::{pin_mut, stream::TryStreamExt}; -use headers::{authorization::Basic, Authorization, HeaderMap, HeaderMapExt, HeaderName}; -use http::StatusCode; +use headers::HeaderMapExt; use indicatif::{ProgressBar, ProgressStyle}; -use lychee_lib::{ClientBuilder, Collector, Input, Request, Response}; +use lychee_lib::{Client, ClientBuilder, Collector, Input, Request, Response}; use openssl_sys as _; // required for vendored-openssl feature use regex::RegexSet; use ring as _; // required for apple silicon @@ -167,7 +166,7 @@ fn fmt(stats: &ResponseStats, format: &Format) -> Result { }) } -async fn run(cfg: &Config, inputs: Vec) -> Result { +fn create_client(cfg: &Config) -> Result { let mut headers = parse_headers(&cfg.headers)?; if let Some(auth) = &cfg.basic_auth { let auth_header = parse_basic_auth(auth)?; @@ -176,7 +175,6 @@ async fn run(cfg: &Config, inputs: Vec) -> Result { let accepted = cfg.accept.clone().and_then(|a| parse_statuscodes(&a).ok()); let timeout = parse_timeout(cfg.timeout); - let max_concurrency = cfg.max_concurrency; let method: reqwest::Method = reqwest::Method::from_str(&cfg.method.to_uppercase())?; let include = RegexSet::new(&cfg.include)?; let exclude = RegexSet::new(&cfg.exclude)?; @@ -188,7 +186,7 @@ async fn run(cfg: &Config, inputs: Vec) -> Result { cfg.scheme.clone() }; - let client = ClientBuilder::builder() + ClientBuilder::builder() .includes(include) .excludes(exclude) .exclude_all_private(cfg.exclude_all_private) @@ -208,9 +206,13 @@ async fn run(cfg: &Config, inputs: Vec) -> Result { .require_https(cfg.require_https) .build() .client() - .map_err(|e| anyhow!(e))?; + .map_err(|e| anyhow!(e)) +} + +async fn run(cfg: &Config, inputs: Vec) -> Result { + let client = create_client(cfg)?; - let links = Collector::new(cfg.base.clone(), cfg.skip_missing, max_concurrency) + let links = Collector::new(cfg.base.clone(), cfg.skip_missing, cfg.max_concurrency) .collect_links(&inputs) .await .map_err(|e| anyhow!(e)); @@ -238,8 +240,8 @@ async fn run(cfg: &Config, inputs: Vec) -> Result { Some(bar) }; - let (send_req, mut recv_req) = mpsc::channel(max_concurrency); - let (send_resp, mut recv_resp) = mpsc::channel(max_concurrency); + let (send_req, mut recv_req) = mpsc::channel(cfg.max_concurrency); + let (send_resp, mut recv_resp) = mpsc::channel(cfg.max_concurrency); let mut stats = ResponseStats::new(); diff --git a/lychee-lib/src/collector.rs b/lychee-lib/src/collector.rs index 091abc5378..d3aa8315d8 100644 --- a/lychee-lib/src/collector.rs +++ b/lychee-lib/src/collector.rs @@ -1,6 +1,6 @@ +use crate::{extract::Extractor, Base, Input, Request, Result}; use async_stream::try_stream; use futures_core::Stream; -use crate::{extract::Extractor, Base, Input, Request, Result}; use tokio_stream::StreamExt; /// Collector keeps the state of link collection @@ -33,7 +33,7 @@ impl Collector { /// # Errors /// /// Will return `Err` if links cannot be extracted from an input - pub async fn collect_links(mut self, inputs: &[Input]) -> impl Stream> + '_ { + pub async fn collect_links(self, inputs: &[Input]) -> impl Stream> + '_ { try_stream! { let (contents_tx, mut contents_rx) = tokio::sync::mpsc::channel(self.max_concurrency); From dfd0735082af1dc6ef4b8ddf51c0f1bbad4889b7 Mon Sep 17 00:00:00 2001 From: Matthias Date: Sun, 10 Oct 2021 02:40:07 +0200 Subject: [PATCH 07/35] Add basic directory support --- Cargo.lock | 102 ++++++++++++++++++++++++++++++-- lychee-bin/tests/local_files.rs | 27 +++++++++ lychee-lib/Cargo.toml | 1 + lychee-lib/src/types/error.rs | 10 ++++ lychee-lib/src/types/input.rs | 51 +++++++++++----- 5 files changed, 171 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 09974ee031..65cdcc62ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -632,11 +632,59 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ae5588f6b3c3cb05239e90bd110f257254aecd01e4635400391aeae07497845" +dependencies = [ + "cfg-if", + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-channel" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6455c0ca19f0d2fbf751b908d5c55c1f5cbc65e03c4225427254b46890bdde1e" +dependencies = [ + "cfg-if", + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ec02e091aa634e2c3ada4a392989e7c3116673ef0ac5b72232439094d73b7fd" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "lazy_static", + "memoffset", + "scopeguard", +] + [[package]] name = "crossbeam-queue" -version = "0.3.1" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f6cb3c7f5b8e51bc3ebb73a2327ad4abdbd119dc13223f14f961d2f38486756" +checksum = "9b10ddc024425c88c2ad148c1b0fd53f4c6d38db9697c9f1588381212fa657c9" dependencies = [ "cfg-if", "crossbeam-utils", @@ -644,11 +692,10 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.3" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7e9d99fa91428effe99c5c6d4634cdeba32b8cf784fc428a2a687f61a952c49" +checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db" dependencies = [ - "autocfg", "cfg-if", "lazy_static", ] @@ -1459,6 +1506,16 @@ dependencies = [ "simple_asn1", ] +[[package]] +name = "jwalk" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "172752e853a067cbce46427de8470ddf308af7fd8ceaf9b682ef31a5021b6bb9" +dependencies = [ + "crossbeam", + "rayon", +] + [[package]] name = "kv-log-macro" version = "1.0.7" @@ -1591,6 +1648,7 @@ dependencies = [ "html5ever", "http", "hubcaps", + "jwalk", "linkify", "log", "markup5ever_rcdom", @@ -1681,6 +1739,15 @@ version = "2.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525" +[[package]] +name = "memoffset" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59accc507f1338036a0477ef61afdae33cde60840f4dfe481319ce3ad116ddf9" +dependencies = [ + "autocfg", +] + [[package]] name = "mime" version = "0.3.16" @@ -2354,6 +2421,31 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "rayon" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c06aca804d41dbc8ba42dfd964f0d01334eceb64314b9ecf7c5fad5188a06d90" +dependencies = [ + "autocfg", + "crossbeam-deque", + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d78120e2c850279833f1dd3582f730c4ab53ed95aeaaaa862a2a5c71b1656d8e" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-utils", + "lazy_static", + "num_cpus", +] + [[package]] name = "redox_syscall" version = "0.2.6" diff --git a/lychee-bin/tests/local_files.rs b/lychee-bin/tests/local_files.rs index 11574e172d..751d3d5c81 100644 --- a/lychee-bin/tests/local_files.rs +++ b/lychee-bin/tests/local_files.rs @@ -33,4 +33,31 @@ mod cli { Ok(()) } + + #[tokio::test] + async fn test_local_dir() -> Result<()> { + let dir = tempfile::tempdir()?; + let index_path = dir.path().join("index.html"); + let mut index = File::create(&index_path)?; + writeln!(index, r#"Foo"#)?; + writeln!(index, r#"Bar"#)?; + + let foo_path = dir.path().join("foo.html"); + File::create(&foo_path)?; + let bar_path = dir.path().join("bar.md"); + File::create(&bar_path)?; + + let mut cmd = main_command(); + cmd.arg(dir.path()) + .arg("--no-progress") + .arg("--verbose") + .env_clear() + .assert() + .success() + .stdout(contains("Total............2")) + .stdout(contains("foo.html")) + .stdout(contains("bar.md")); + + Ok(()) + } } diff --git a/lychee-lib/Cargo.toml b/lychee-lib/Cargo.toml index d7d09f53fc..2bee002588 100644 --- a/lychee-lib/Cargo.toml +++ b/lychee-lib/Cargo.toml @@ -48,6 +48,7 @@ futures-core = "0.3.17" tokio-stream = "0.1.7" cached = "0.25.0" once_cell = "1.8.0" +jwalk = "0.6.0" [dev-dependencies] doc-comment = "0.3.3" diff --git a/lychee-lib/src/types/error.rs b/lychee-lib/src/types/error.rs index 9949f30c1f..f4d3d8f705 100644 --- a/lychee-lib/src/types/error.rs +++ b/lychee-lib/src/types/error.rs @@ -38,6 +38,8 @@ pub enum ErrorKind { InvalidBase(String, String), /// Cannot find local file FileNotFound(PathBuf), + /// Error while traversing an input directory + DirTraversal(jwalk::Error), /// The given UNIX glob pattern is invalid InvalidGlobPattern(glob::PatternError), /// The Github API could not be called because of a missing Github token. @@ -80,6 +82,7 @@ impl Hash for ErrorKind { Self::IoError(p, e) => (p, e.kind()).hash(state), Self::ReqwestError(e) => e.to_string().hash(state), Self::HubcapsError(e) => e.to_string().hash(state), + Self::DirTraversal(e) => e.to_string().hash(state), Self::FileNotFound(e) => e.to_string_lossy().hash(state), Self::UrlParseError(s, e) => (s, e.type_id()).hash(state), Self::InvalidURI(u) => u.hash(state), @@ -109,6 +112,7 @@ impl Display for ErrorKind { Self::IoError(None, e) => e.fmt(f), Self::ReqwestError(e) => e.fmt(f), Self::HubcapsError(e) => e.fmt(f), + Self::DirTraversal(e) => e.fmt(f), Self::FileNotFound(e) => write!(f, "{}", e.to_string_lossy()), Self::UrlParseError(s, (url_err, Some(mail_err))) => { write!( @@ -165,6 +169,12 @@ impl From for ErrorKind { } } +impl From for ErrorKind { + fn from(e: jwalk::Error) -> Self { + Self::DirTraversal(e) + } +} + impl From for ErrorKind { fn from(e: std::io::Error) -> Self { Self::IoError(None, e) diff --git a/lychee-lib/src/types/input.rs b/lychee-lib/src/types/input.rs index c13dc071a4..919586a024 100644 --- a/lychee-lib/src/types/input.rs +++ b/lychee-lib/src/types/input.rs @@ -3,6 +3,7 @@ use crate::Result; use async_stream::try_stream; use futures_core::stream::Stream; use glob::glob_with; +use jwalk::WalkDir; use reqwest::Url; use serde::Serialize; use shellexpand::tilde; @@ -132,25 +133,45 @@ impl Input { } } Input::FsPath(ref path) => { - let content = Self::path_content(path).await; - match content { - Err(_) if skip_missing => (), - Err(e) => Err(e)?, - Ok(content) => yield content, - }; + if path.is_dir() { + for entry in WalkDir::new(path).skip_hidden(true) { + let entry = entry?; + if entry.file_type().is_dir() { + continue; + } + if !Self::valid_extension(&entry.path()) { + continue + } + let content = Self::path_content(entry.path()).await?; + yield content; + } + } else { + let content = Self::path_content(path).await; + match content { + Err(_) if skip_missing => (), + Err(e) => Err(e)?, + Ok(content) => yield content, + }; + } }, Input::Stdin => { - let contents = Self::stdin_content(file_type_hint).await?; - yield contents; + let content = Self::stdin_content(file_type_hint).await?; + yield content; }, Input::String(ref s) => { - let contents = Self::string_content(s, file_type_hint); - yield contents; + let content = Self::string_content(s, file_type_hint); + yield content; }, } } } + // Check the extension of the given path against the list of known/accepted + // file extensions + fn valid_extension(p: &PathBuf) -> bool { + matches!(FileType::from(p), FileType::Markdown | FileType::Html) + } + async fn url_contents(url: &Url) -> Result { // Assume HTML for default paths let file_type = if url.path().is_empty() || url.path() == "/" { @@ -182,12 +203,12 @@ impl Input { for entry in glob_with(&glob_expanded, match_opts)? { match entry { Ok(path) => { + // Directories can have a suffix which looks like + // a file extension (like `foo.html`). This can lead to + // unexpected behavior with glob patterns like + // `**/*.html`. Therefore filter these out. + // See https://github.com/lycheeverse/lychee/pull/262#issuecomment-913226819 if path.is_dir() { - // Directories can still have a suffix which looks like - // a file extension like `foo.html`. This can lead to - // unexpected behavior with glob patterns like - // `**/*.html`. Therefore filter these out. - // https://github.com/lycheeverse/lychee/pull/262#issuecomment-913226819 continue; } let content: InputContent = Self::path_content(&path).await?; From 5f790bfe5ca8a0503fa6defb5223a1988b889404 Mon Sep 17 00:00:00 2001 From: Timo Freiberg Date: Sat, 9 Oct 2021 18:30:50 +0200 Subject: [PATCH 08/35] Fix deadlock --- lychee-bin/src/main.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lychee-bin/src/main.rs b/lychee-bin/src/main.rs index 6f406a52b7..0428138e6c 100644 --- a/lychee-bin/src/main.rs +++ b/lychee-bin/src/main.rs @@ -67,7 +67,7 @@ use std::iter::FromIterator; use std::{collections::HashSet, fs, str::FromStr}; use anyhow::{anyhow, Context, Result}; -use futures::{pin_mut, stream::TryStreamExt}; +use futures::stream::TryStreamExt; use headers::HeaderMapExt; use indicatif::{ProgressBar, ProgressStyle}; use lychee_lib::{Client, ClientBuilder, ClientPool, Collector, Input, Request, Response}; @@ -266,7 +266,7 @@ async fn run(cfg: &Config, inputs: Vec) -> Result { } }); - pin_mut!(links); + tokio::pin!(links); while let Some(link) = links.next().await { let link = link?; if let Some(pb) = &bar { @@ -275,6 +275,9 @@ async fn run(cfg: &Config, inputs: Vec) -> Result { }; send_req.send(link).await.unwrap(); } + // required for the receiver task to end, which closes send_resp, which allows + // the show_results_task to finish + drop(send_req); let (pb, stats) = show_results_task.await?; From d42bf3e3ee224bdb9a6fa5033ed47fc973f38422 Mon Sep 17 00:00:00 2001 From: Matthias Date: Sun, 10 Oct 2021 02:58:53 +0200 Subject: [PATCH 09/35] Clippy --- lychee-lib/src/types/input.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lychee-lib/src/types/input.rs b/lychee-lib/src/types/input.rs index 919586a024..5a7e2a0e6b 100644 --- a/lychee-lib/src/types/input.rs +++ b/lychee-lib/src/types/input.rs @@ -168,7 +168,7 @@ impl Input { // Check the extension of the given path against the list of known/accepted // file extensions - fn valid_extension(p: &PathBuf) -> bool { + fn valid_extension(p: &Path) -> bool { matches!(FileType::from(p), FileType::Markdown | FileType::Html) } From 5bea0c811a22c0d4a0fae38cf474c02afcac77c1 Mon Sep 17 00:00:00 2001 From: Matthias Date: Sun, 10 Oct 2021 15:36:34 +0200 Subject: [PATCH 10/35] Add test for http protocol file type --- lychee-lib/src/types/file.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lychee-lib/src/types/file.rs b/lychee-lib/src/types/file.rs index 14f869d262..05ff157289 100644 --- a/lychee-lib/src/types/file.rs +++ b/lychee-lib/src/types/file.rs @@ -64,6 +64,9 @@ mod tests { assert_eq!(FileType::from(Path::new("test.htm")), FileType::Html); assert_eq!(FileType::from(Path::new("index.html")), FileType::Html); - assert_eq!(FileType::from(Path::new("http://foo.com/index.html")), FileType::Html); + assert_eq!( + FileType::from(Path::new("http://foo.com/index.html")), + FileType::Html + ); } } From 8ea4de684f6ece6ffaa942e7db51319cb0a0127a Mon Sep 17 00:00:00 2001 From: Matthias Date: Sun, 10 Oct 2021 15:37:22 +0200 Subject: [PATCH 11/35] Remove deadpool (once again) Replaced with `futures::StreamExt::for_each_concurrent`. --- Cargo.lock | 1 - lychee-bin/src/main.rs | 15 ++++++++--- lychee-lib/Cargo.toml | 1 - lychee-lib/src/client_pool.rs | 49 ----------------------------------- lychee-lib/src/lib.rs | 2 -- 5 files changed, 11 insertions(+), 57 deletions(-) delete mode 100644 lychee-lib/src/client_pool.rs diff --git a/Cargo.lock b/Cargo.lock index 6c7ec9806a..5fae557ce0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1640,7 +1640,6 @@ dependencies = [ "async-stream", "cached", "check-if-email-exists", - "deadpool", "doc-comment", "fast_chemail", "futures-core", diff --git a/lychee-bin/src/main.rs b/lychee-bin/src/main.rs index 0428138e6c..c7915c5d46 100644 --- a/lychee-bin/src/main.rs +++ b/lychee-bin/src/main.rs @@ -60,6 +60,7 @@ // required for apple silicon use ring as _; +use tokio_stream::wrappers::ReceiverStream; use std::fs::File; use std::io::{self, BufRead, Write}; @@ -70,7 +71,7 @@ use anyhow::{anyhow, Context, Result}; use futures::stream::TryStreamExt; use headers::HeaderMapExt; use indicatif::{ProgressBar, ProgressStyle}; -use lychee_lib::{Client, ClientBuilder, ClientPool, Collector, Input, Request, Response}; +use lychee_lib::{Client, ClientBuilder, Collector, Input, Request, Response}; use openssl_sys as _; // required for vendored-openssl feature use regex::RegexSet; use ring as _; // required for apple silicon @@ -250,9 +251,15 @@ async fn run(cfg: &Config, inputs: Vec) -> Result { // Start receiving requests tokio::spawn(async move { - let clients = vec![client; max_concurrency]; - let mut clients = ClientPool::new(send_resp, recv_req, clients); - clients.listen().await; + futures::StreamExt::for_each_concurrent( + ReceiverStream::new(recv_req), + max_concurrency, + |req| async { + let resp = client.check(req).await.unwrap(); + send_resp.send(resp).await.unwrap(); + }, + ) + .await; }); let show_results_task = tokio::spawn({ diff --git a/lychee-lib/Cargo.toml b/lychee-lib/Cargo.toml index 7bf2cf5efc..a43292bf6f 100644 --- a/lychee-lib/Cargo.toml +++ b/lychee-lib/Cargo.toml @@ -18,7 +18,6 @@ version = "0.7.2" [dependencies] check-if-email-exists = "0.8.25" -deadpool = "0.7.0" fast_chemail = "0.9.6" glob = "0.3.0" html5ever = "0.25.1" diff --git a/lychee-lib/src/client_pool.rs b/lychee-lib/src/client_pool.rs deleted file mode 100644 index a2c8a319f1..0000000000 --- a/lychee-lib/src/client_pool.rs +++ /dev/null @@ -1,49 +0,0 @@ -use client::Client; -use deadpool::unmanaged::Pool; -use tokio::sync::mpsc; - -use crate::{client, types}; - -#[allow(missing_debug_implementations)] -/// Manages a channel for incoming requests -/// and a pool of lychee clients to handle them -/// -/// Note: Although `reqwest` has its own pool, -/// it only works for connections to the same host, so -/// a single client can still be blocked until a request is done. -pub struct ClientPool { - tx: mpsc::Sender, - rx: mpsc::Receiver, - pool: deadpool::unmanaged::Pool, -} - -impl ClientPool { - #[must_use] - /// Creates a new client pool - pub fn new( - tx: mpsc::Sender, - rx: mpsc::Receiver, - clients: Vec, - ) -> Self { - let pool = Pool::from(clients); - ClientPool { tx, rx, pool } - } - - #[allow(clippy::missing_panics_doc)] - /// Start listening for incoming requests and send each of them - /// asynchronously to a client from the pool - pub async fn listen(&mut self) { - while let Some(req) = self.rx.recv().await { - let client = self.pool.get().await; - let tx = self.tx.clone(); - tokio::spawn(async move { - // Client::check() may fail only because Request::try_from() may fail - // here request is already Request, so it never fails - let resp = client.check(req).await.unwrap(); - tx.send(resp) - .await - .expect("Cannot send response to channel"); - }); - } - } -} diff --git a/lychee-lib/src/lib.rs b/lychee-lib/src/lib.rs index 61e939dd91..e4d1de057a 100644 --- a/lychee-lib/src/lib.rs +++ b/lychee-lib/src/lib.rs @@ -47,7 +47,6 @@ doc_comment::doctest!("../../README.md"); mod client; -mod client_pool; /// A pool of clients, to handle concurrent checks pub mod collector; mod helpers; @@ -74,7 +73,6 @@ use ring as _; // required for apple silicon #[doc(inline)] pub use crate::{ client::{check, Client, ClientBuilder}, - client_pool::ClientPool, collector::Collector, filter::{Excludes, Filter, Includes}, types::{Base, ErrorKind, Input, Request, Response, ResponseBody, Result, Status, Uri}, From f33468e93a4dba2efb449d1e108a96e52e2825ca Mon Sep 17 00:00:00 2001 From: Matthias Date: Sun, 10 Oct 2021 17:24:54 +0200 Subject: [PATCH 12/35] Refactor main; fix tests --- Cargo.lock | 2 + examples/client_pool/Cargo.toml | 2 + examples/client_pool/client_pool.rs | 25 ++- lychee-bin/src/client.rs | 52 ++++++ lychee-bin/src/main.rs | 276 ++++++++++------------------ lychee-bin/src/options.rs | 29 ++- lychee-bin/src/stats.rs | 29 +++ lychee-bin/tests/cli.rs | 2 +- 8 files changed, 224 insertions(+), 193 deletions(-) create mode 100644 lychee-bin/src/client.rs diff --git a/Cargo.lock b/Cargo.lock index 5fae557ce0..28c9efd359 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -519,8 +519,10 @@ dependencies = [ name = "client_pool" version = "0.1.0" dependencies = [ + "futures", "lychee-lib", "tokio", + "tokio-stream", ] [[package]] diff --git a/examples/client_pool/Cargo.toml b/examples/client_pool/Cargo.toml index 2f305e9ea5..7db7acc62a 100644 --- a/examples/client_pool/Cargo.toml +++ b/examples/client_pool/Cargo.toml @@ -10,3 +10,5 @@ path = "client_pool.rs" [dependencies] lychee-lib = { path = "../../lychee-lib", version = "0.7.2" } tokio = { version = "1.12.0", features = ["full"] } +futures = "0.3.17" +tokio-stream = "0.1.7" diff --git a/examples/client_pool/client_pool.rs b/examples/client_pool/client_pool.rs index ef4bc8d46b..a55e7cbb4f 100644 --- a/examples/client_pool/client_pool.rs +++ b/examples/client_pool/client_pool.rs @@ -1,14 +1,14 @@ -use lychee_lib::{ClientBuilder, ClientPool, Input, Request, Result, Uri}; +use lychee_lib::{ClientBuilder, Input, Request, Result, Uri}; use std::convert::TryFrom; use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; const CONCURRENT_REQUESTS: usize = 4; #[tokio::main] -#[allow(clippy::trivial_regex)] async fn main() -> Result<()> { // These channels are used to send requests and receive responses to and - // from the lychee client pool + // from lychee let (send_req, recv_req) = mpsc::channel(CONCURRENT_REQUESTS); let (send_resp, mut recv_resp) = mpsc::channel(CONCURRENT_REQUESTS); @@ -18,7 +18,7 @@ async fn main() -> Result<()> { Input::Stdin, )]; - // Send requests to pool + // Queue requests tokio::spawn(async move { for request in requests { println!("Sending {}", request); @@ -29,13 +29,18 @@ async fn main() -> Result<()> { // Create a default lychee client let client = ClientBuilder::default().client()?; - // Create a pool with four lychee clients - let clients = vec![client; CONCURRENT_REQUESTS]; - let mut clients = ClientPool::new(send_resp, recv_req, clients); - - // Handle requests in a client pool + // Start receiving requests + // Requests get streamed into the client and run concurrently tokio::spawn(async move { - clients.listen().await; + futures::StreamExt::for_each_concurrent( + ReceiverStream::new(recv_req), + CONCURRENT_REQUESTS, + |req| async { + let resp = client.check(req).await.unwrap(); + send_resp.send(resp).await.unwrap(); + }, + ) + .await; }); // Finally, listen to incoming responses from lychee diff --git a/lychee-bin/src/client.rs b/lychee-bin/src/client.rs new file mode 100644 index 0000000000..294537cd8a --- /dev/null +++ b/lychee-bin/src/client.rs @@ -0,0 +1,52 @@ +use crate::options::Config; +use crate::parse::{parse_basic_auth, parse_headers, parse_statuscodes, parse_timeout}; +use anyhow::{anyhow, Result}; +use headers::HeaderMapExt; +use lychee_lib::{Client, ClientBuilder}; +use regex::RegexSet; +use std::iter::FromIterator; +use std::{collections::HashSet, str::FromStr}; + +/// Creates a client according to the command-line config +pub(crate) fn create(cfg: &Config) -> Result { + let mut headers = parse_headers(&cfg.headers)?; + if let Some(auth) = &cfg.basic_auth { + let auth_header = parse_basic_auth(auth)?; + headers.typed_insert(auth_header); + } + + let accepted = cfg.accept.clone().and_then(|a| parse_statuscodes(&a).ok()); + let timeout = parse_timeout(cfg.timeout); + let method: reqwest::Method = reqwest::Method::from_str(&cfg.method.to_uppercase())?; + let include = RegexSet::new(&cfg.include)?; + let exclude = RegexSet::new(&cfg.exclude)?; + + // Offline mode overrides the scheme + let schemes = if cfg.offline { + vec!["file".to_string()] + } else { + cfg.scheme.clone() + }; + + ClientBuilder::builder() + .includes(include) + .excludes(exclude) + .exclude_all_private(cfg.exclude_all_private) + .exclude_private_ips(cfg.exclude_private) + .exclude_link_local_ips(cfg.exclude_link_local) + .exclude_loopback_ips(cfg.exclude_loopback) + .exclude_mail(cfg.exclude_mail) + .max_redirects(cfg.max_redirects) + .user_agent(cfg.user_agent.clone()) + .allow_insecure(cfg.insecure) + .custom_headers(headers) + .method(method) + .timeout(timeout) + .github_token(cfg.github_token.clone()) + .schemes(HashSet::from_iter(schemes)) + .accepted(accepted) + .require_https(cfg.require_https) + .build() + .client() + .map_err(|e| anyhow!("Failed to create request client: {}", e)) +} diff --git a/lychee-bin/src/main.rs b/lychee-bin/src/main.rs index c7915c5d46..06ea04a677 100644 --- a/lychee-bin/src/main.rs +++ b/lychee-bin/src/main.rs @@ -58,34 +58,28 @@ #![deny(anonymous_parameters, macro_use_extern_crate, pointer_structural_match)] #![deny(missing_docs)] +use lychee_lib::{Collector, Input, Request, Response}; // required for apple silicon use ring as _; use tokio_stream::wrappers::ReceiverStream; -use std::fs::File; -use std::io::{self, BufRead, Write}; -use std::iter::FromIterator; -use std::{collections::HashSet, fs, str::FromStr}; +use std::io::{self, Write}; -use anyhow::{anyhow, Context, Result}; +use anyhow::{anyhow, Result}; use futures::stream::TryStreamExt; -use headers::HeaderMapExt; use indicatif::{ProgressBar, ProgressStyle}; -use lychee_lib::{Client, ClientBuilder, Collector, Input, Request, Response}; use openssl_sys as _; // required for vendored-openssl feature -use regex::RegexSet; use ring as _; // required for apple silicon -use structopt::StructOpt; use tokio::sync::mpsc; use tokio_stream::StreamExt; +mod client; mod options; mod parse; mod stats; -use crate::parse::{parse_basic_auth, parse_headers, parse_statuscodes, parse_timeout}; use crate::{ - options::{Config, Format, LycheeOptions}, + options::Config, stats::{color_response, ResponseStats}, }; @@ -104,7 +98,7 @@ fn main() -> Result<()> { #[cfg(feature = "tokio-console")] console_subscriber::init(); // std::process::exit doesn't guarantee that all destructors will be ran, - // therefore we wrap "main" code in another function to guarantee that. + // therefore we wrap "main" code in another function to ensure that. // See: https://doc.rust-lang.org/stable/std/process/fn.exit.html // Also see: https://www.youtube.com/watch?v=zQC8T71Y8e4 let exit_code = run_main()?; @@ -112,24 +106,9 @@ fn main() -> Result<()> { } fn run_main() -> Result { - let mut opts = LycheeOptions::from_args(); + let merged = options::merge()?; - // Load a potentially existing config file and merge it into the config from the CLI - if let Some(c) = Config::load_from_file(&opts.config_file)? { - opts.config.merge(c); - } - - // Load excludes from file - for path in &opts.config.exclude_file { - let file = File::open(path)?; - opts.config - .exclude - .append(&mut io::BufReader::new(file).lines().collect::>()?); - } - - let cfg = &opts.config; - - let runtime = match cfg.threads { + let runtime = match merged.config.threads { Some(threads) => { // We define our own runtime instead of the `tokio::main` attribute // since we want to make the number of threads configurable @@ -141,170 +120,122 @@ fn run_main() -> Result { None => tokio::runtime::Runtime::new()?, }; - runtime.block_on(run(cfg, opts.inputs())) -} - -fn show_progress(progress_bar: &Option, response: &Response, verbose: bool) { - let out = color_response(&response.1); - if let Some(pb) = progress_bar { - pb.inc(1); - pb.set_message(&out); - if verbose { - pb.println(out); - } - } else { - if (response.status().is_success() || response.status().is_excluded()) && !verbose { - return; - } - println!("{}", out); - } -} - -fn fmt(stats: &ResponseStats, format: &Format) -> Result { - Ok(match format { - Format::String => stats.to_string(), - Format::Json => serde_json::to_string_pretty(&stats)?, - }) -} - -fn create_client(cfg: &Config) -> Result { - let mut headers = parse_headers(&cfg.headers)?; - if let Some(auth) = &cfg.basic_auth { - let auth_header = parse_basic_auth(auth)?; - headers.typed_insert(auth_header); - } - - let accepted = cfg.accept.clone().and_then(|a| parse_statuscodes(&a).ok()); - let timeout = parse_timeout(cfg.timeout); - let method: reqwest::Method = reqwest::Method::from_str(&cfg.method.to_uppercase())?; - let include = RegexSet::new(&cfg.include)?; - let exclude = RegexSet::new(&cfg.exclude)?; - - // Offline mode overrides the scheme - let schemes = if cfg.offline { - vec!["file".to_string()] - } else { - cfg.scheme.clone() - }; - - ClientBuilder::builder() - .includes(include) - .excludes(exclude) - .exclude_all_private(cfg.exclude_all_private) - .exclude_private_ips(cfg.exclude_private) - .exclude_link_local_ips(cfg.exclude_link_local) - .exclude_loopback_ips(cfg.exclude_loopback) - .exclude_mail(cfg.exclude_mail) - .max_redirects(cfg.max_redirects) - .user_agent(cfg.user_agent.clone()) - .allow_insecure(cfg.insecure) - .custom_headers(headers) - .method(method) - .timeout(timeout) - .github_token(cfg.github_token.clone()) - .schemes(HashSet::from_iter(schemes)) - .accepted(accepted) - .require_https(cfg.require_https) - .build() - .client() - .map_err(|e| anyhow!("Failed to create request client: {}", e)) + runtime.block_on(run(&merged.config, merged.inputs())) } async fn run(cfg: &Config, inputs: Vec) -> Result { - let client = create_client(cfg)?; + let client = client::create(cfg)?; + println!("Collect links"); let links = Collector::new(cfg.base.clone(), cfg.skip_missing, cfg.max_concurrency) .collect_links(&inputs) .await - .map_err(|e| anyhow!(e)); + .map_err(|e| anyhow!("Cannot collect links from inputs: {}", e)); - if cfg.dump { - let exit_code = dump_links( + let exit_code = if cfg.dump { + dump_links( links .collect::>>() .await? .iter() .filter(|link| !client.filtered(&link.uri)), - ); - return Ok(exit_code as i32); - } - - let pb = if cfg.no_progress { - None + ) } else { - let bar = ProgressBar::new_spinner().with_style(ProgressStyle::default_bar().template( - "{spinner:.red.bright} {pos}/{len:.dim} [{elapsed_precise}] {bar:25} {wide_msg}", - )); - bar.set_length(0); - bar.set_message("Extracting links"); - bar.enable_steady_tick(100); - Some(bar) - }; - - let (send_req, recv_req) = mpsc::channel(cfg.max_concurrency); - let (send_resp, mut recv_resp) = mpsc::channel(cfg.max_concurrency); - - let mut stats = ResponseStats::new(); + let (send_req, recv_req) = mpsc::channel(cfg.max_concurrency); + let (send_resp, mut recv_resp) = mpsc::channel(cfg.max_concurrency); + let max_concurrency = cfg.max_concurrency; + let mut stats = ResponseStats::new(); + + // Start receiving requests + tokio::spawn(async move { + println!("Spawn checker"); + futures::StreamExt::for_each_concurrent( + ReceiverStream::new(recv_req), + max_concurrency, + |req| async { + let resp = client.check(req).await.unwrap(); + send_resp.send(resp).await.unwrap(); + }, + ) + .await; + }); + + let pb = if cfg.no_progress { + None + } else { + let bar = ProgressBar::new_spinner().with_style(ProgressStyle::default_bar().template( + "{spinner:.red.bright} {pos}/{len:.dim} [{elapsed_precise}] {bar:25} {wide_msg}", + )); + bar.set_length(0); + bar.set_message("Extracting links"); + bar.enable_steady_tick(100); + Some(bar) + }; - let bar = pb.clone(); - let max_concurrency = cfg.max_concurrency; + let bar = pb.clone(); + let show_results_task = tokio::spawn({ + let verbose = cfg.verbose; + async move { + while let Some(response) = recv_resp.recv().await { + show_progress(&pb, &response, verbose); + stats.add(response); + } + (pb, stats) + } + }); - // Start receiving requests - tokio::spawn(async move { - futures::StreamExt::for_each_concurrent( - ReceiverStream::new(recv_req), - max_concurrency, - |req| async { - let resp = client.check(req).await.unwrap(); - send_resp.send(resp).await.unwrap(); - }, - ) - .await; - }); + tokio::pin!(links); - let show_results_task = tokio::spawn({ - let verbose = cfg.verbose; - async move { - while let Some(response) = recv_resp.recv().await { - show_progress(&pb, &response, verbose); - stats.add(response); - } - (pb, stats) + while let Some(link) = links.next().await { + let link = link?; + if let Some(pb) = &bar { + pb.inc_length(1); + pb.set_message(&link.to_string()); + }; + send_req.send(link).await.unwrap(); } - }); + // required for the receiver task to end, which closes send_resp, which allows + // the show_results_task to finish + drop(send_req); - tokio::pin!(links); - while let Some(link) = links.next().await { - let link = link?; - if let Some(pb) = &bar { - pb.inc_length(1); - pb.set_message(&link.to_string()); - }; - send_req.send(link).await.unwrap(); - } - // required for the receiver task to end, which closes send_resp, which allows - // the show_results_task to finish - drop(send_req); + let (pb, stats) = show_results_task.await?; - let (pb, stats) = show_results_task.await?; + // Note that print statements may interfere with the progress bar, so this + // must go before printing the stats + if let Some(pb) = &pb { + pb.finish_and_clear(); + } - // Note that print statements may interfere with the progress bar, so this - // must go before printing the stats - if let Some(pb) = &pb { - pb.finish_and_clear(); - } + stats::write(&stats, cfg)?; - write_stats(&stats, cfg)?; + if stats.is_success() { + ExitCode::Success + } else { + ExitCode::LinkCheckFailure + } + }; + Ok(exit_code as i32) +} - if stats.is_success() { - Ok(ExitCode::Success as i32) +fn show_progress(progress_bar: &Option, response: &Response, verbose: bool) { + let out = color_response(&response.1); + if let Some(pb) = progress_bar { + pb.inc(1); + pb.set_message(&out); + if verbose { + pb.println(out); + } } else { - Ok(ExitCode::LinkCheckFailure as i32) + if (response.status().is_success() || response.status().is_excluded()) && !verbose { + return; + } + println!("{}", out); } } /// Dump all detected links to stdout without checking them fn dump_links<'a>(links: impl Iterator) -> ExitCode { + println!("inside dump"); let mut stdout = io::stdout(); for link in links { // Avoid panic on broken pipe. @@ -320,20 +251,3 @@ fn dump_links<'a>(links: impl Iterator) -> ExitCode { } ExitCode::Success } - -/// Write final statistics to stdout or to file -fn write_stats(stats: &ResponseStats, cfg: &Config) -> Result<()> { - let formatted = fmt(stats, &cfg.format)?; - - if let Some(output) = &cfg.output { - fs::write(output, formatted).context("Cannot write status output to file")?; - } else { - if cfg.verbose && !stats.is_empty() { - // separate summary from the verbose list of links above - println!(); - } - // we assume that the formatted stats don't have a final newline - println!("{}", stats); - } - Ok(()) -} diff --git a/lychee-bin/src/options.rs b/lychee-bin/src/options.rs index 790986d787..6622eb587e 100644 --- a/lychee-bin/src/options.rs +++ b/lychee-bin/src/options.rs @@ -1,9 +1,16 @@ -use std::{convert::TryFrom, fs, io::ErrorKind, path::PathBuf, str::FromStr}; +use std::{ + convert::TryFrom, + fs::{self, File}, + io::{self, ErrorKind}, + path::PathBuf, + str::FromStr, +}; use anyhow::{anyhow, Error, Result}; use lazy_static::lazy_static; use lychee_lib::{Base, Input}; use serde::Deserialize; +use std::io::BufRead; use structopt::{clap::crate_version, StructOpt}; const METHOD: &str = "get"; @@ -330,3 +337,23 @@ impl Config { } } } + +/// Merge all provided config options into one +/// This includes a potential config file, command-line- and environment variables +pub(crate) fn merge() -> Result { + let mut opts = LycheeOptions::from_args(); + + // Merge a potentially existing config file and merge it into the config from the CLI + if let Some(c) = Config::load_from_file(&opts.config_file)? { + opts.config.merge(c); + } + + // Load and merge excludes from file + for path in &opts.config.exclude_file { + let file = File::open(path)?; + opts.config + .exclude + .append(&mut io::BufReader::new(file).lines().collect::>()?); + } + Ok(opts) +} diff --git a/lychee-bin/src/stats.rs b/lychee-bin/src/stats.rs index 579c30a21e..8558b51c63 100644 --- a/lychee-bin/src/stats.rs +++ b/lychee-bin/src/stats.rs @@ -1,14 +1,19 @@ use std::{ collections::{HashMap, HashSet}, fmt::{self, Display}, + fs, }; +use anyhow::{Context, Result}; + use console::Style; use lychee_lib::{Input, Response, ResponseBody, Status}; use once_cell::sync::Lazy; use pad::{Alignment, PadStr}; use serde::Serialize; +use crate::options::{Config, Format}; + static GREEN: Lazy