diff --git a/Cargo.toml b/Cargo.toml index 9fb2604..2d12352 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,12 +9,14 @@ homepage = "https://gitlab.collabora.com/lava/lava-gitlab-runner" repository = "https://gitlab.collabora.com/lava/lava-gitlab-runner.git" [dependencies] +axum = "0.6" bytes = "1.2.0" chrono = { version = "0.4", features = ["serde"] } colored = "2" gitlab-runner = "0.0.8" lava-api = "0.1.1" lazy_static = "1.4" +local-ip-address = "0.5" structopt = "0.3.23" url = "2.2.2" tokio = "1.12.0" @@ -27,6 +29,7 @@ serde = { version = "^1.0.97", features = ["derive"] } serde_json = "1.0.68" serde_yaml = "0.9" rand = "0.8.4" +rand_chacha = "0.3" tempfile = "3.3.0" tokio-util = { version = "0.7", features = [ "io" ] } tracing-subscriber = { version = "0.3.9", features = [ "env-filter"] } diff --git a/src/main.rs b/src/main.rs index 1edd88e..2f419c5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,15 @@ use std::borrow::Cow; use std::collections::btree_map::Entry; use std::collections::{BTreeMap, HashSet}; +use std::env; use std::io::Read; +use std::net::{IpAddr, Ipv4Addr}; use std::sync::{Arc, Mutex}; use std::time::Duration; +use axum::extract::Path; +use axum::routing::post; +use axum::Router; use bytes::{Buf, Bytes}; use colored::{Color, Colorize}; use futures::stream::{Stream, TryStreamExt}; @@ -25,13 +30,16 @@ use structopt::StructOpt; use tokio::time::sleep; use tokio_util::sync::CancellationToken; use tracing::Level; -use tracing::{debug, info}; +use tracing::{debug, info, warn}; use tracing_subscriber::filter; use tracing_subscriber::prelude::*; use url::Url; mod throttled; +mod upload; + use throttled::{ThrottledLava, Throttler}; +use upload::{JobArtifacts, UploadServer}; const MASK_PATTERN: &str = "[MASKED]"; @@ -107,6 +115,7 @@ struct MonitorJobs { #[derive(Clone, Debug, Serialize)] struct TransformVariables<'a> { pub job: BTreeMap<&'a str, &'a str>, + pub runner: BTreeMap<&'a str, &'a str>, } #[derive(Debug)] @@ -243,11 +252,18 @@ enum JobCancelBehaviour { struct AvailableArtifactStore { lava: Arc, masker: Arc, + artifact_caches: Mutex>>>, + job_map: Mutex>>>, } impl AvailableArtifactStore { pub fn new(lava: Arc, masker: Arc) -> Self { - Self { lava, masker } + Self { + lava, + masker, + artifact_caches: Default::default(), + job_map: Default::default(), + } } pub fn get_log( @@ -282,12 +298,53 @@ impl AvailableArtifactStore { .flatten_stream(), ) } + + pub fn create_upload_url(&self) -> String { + let artifacts = UPLOAD_SERVER.lock().unwrap().add_new_job(); + let url = artifacts.lock().unwrap().get_upload_url().to_string(); + self.artifact_caches + .lock() + .unwrap() + .insert(url.clone(), artifacts); + url + } + + pub fn add_job_for_upload_url(&self, id: i64, upload_url: &str) { + let artifacts = self + .artifact_caches + .lock() + .unwrap() + .get(upload_url) + .unwrap() + .clone(); + self.job_map.lock().unwrap().insert(id, artifacts); + } + + pub fn get_uploaded_artifact(&self, id: i64, path: &str) -> Option { + self.job_map + .lock() + .unwrap() + .get(&id) + .and_then(|cache| cache.lock().unwrap().get_artifact_data(path)) + } + + pub fn get_uploaded_artifact_paths(&self, id: i64) -> Option> { + self.job_map.lock().unwrap().get(&id).map(|cache| { + cache + .lock() + .unwrap() + .get_artifact_paths() + .map(str::to_string) + .collect() + }) + } } #[derive(Clone, Debug, PartialEq, Eq, Ord, PartialOrd)] enum LavaUploadableFileType { Log { id: i64 }, Junit { id: i64 }, + Artifact { id: i64, path: String }, } #[derive(Clone)] @@ -336,15 +393,25 @@ impl LavaUploadableFile { store, } } + + pub fn artifact(id: i64, path: String, store: Arc) -> Self { + Self { + which: LavaUploadableFileType::Artifact { id, path }, + store, + } + } } impl UploadableFile for LavaUploadableFile { type Data<'a> = Box; fn get_path(&self) -> Cow<'_, str> { - match self.which { + match &self.which { LavaUploadableFileType::Log { id } => format!("{}_log.yaml", id).into(), LavaUploadableFileType::Junit { id } => format!("{}_junit.xml", id).into(), + LavaUploadableFileType::Artifact { id, path } => { + format!("{}_artifacts/{}", id, path).into() + } } } @@ -357,6 +424,9 @@ impl UploadableFile for LavaUploadableFile { LavaUploadableFileType::Junit { id } => { Box::new(self.store.get_junit(*id).into_async_read()) } + LavaUploadableFileType::Artifact { id, path } => Box::new(futures::io::Cursor::new( + self.store.get_uploaded_artifact(*id, path).unwrap(), + )), } } } @@ -724,7 +794,7 @@ impl Run { } } - fn transform(&self, definition: String) -> Result { + fn transform(&self, definition: String, upload_url: &str) -> Result { let mut handlebars = Handlebars::new(); handlebars.set_strict_mode(true); handlebars @@ -739,6 +809,7 @@ impl Run { .variables() .map(|var| (var.key(), var.value())) .collect(), + runner: BTreeMap::from([("ARTIFACT_UPLOAD_URL", upload_url)]), }; handlebars.render("definition", &mappings).map_err(|e| { outputln!("Failed to substitute in template: {}", e); @@ -754,8 +825,9 @@ impl Run { "submit" => { if let Some(filename) = p.next() { let data = self.find_file(filename).await?; + let upload_url = self.store.create_upload_url(); let definition = match String::from_utf8(data) { - Ok(data) => self.transform(data)?, + Ok(data) => self.transform(data, &upload_url)?, Err(_) => { outputln!("Job definition is not utf-8"); return Err(()); @@ -763,6 +835,9 @@ impl Run { }; let ids = self.submit_definition(&definition).await?; self.ids.extend(&ids); + for id in &self.ids { + self.store.add_job_for_upload_url(*id, &upload_url); + } self.follow_job(ids[0], cancel_token, JobCancelBehaviour::CancelLava) .await } else { @@ -834,6 +909,14 @@ impl CancellableJobHandler for Run { for id in &self.ids { available_files.push(LavaUploadableFile::log(*id, self.store.clone())); available_files.push(LavaUploadableFile::junit(*id, self.store.clone())); + for path in self + .store + .get_uploaded_artifact_paths(*id) + .into_iter() + .flatten() + { + available_files.push(LavaUploadableFile::artifact(*id, path, self.store.clone())); + } } Ok(Box::new(available_files.into_iter())) } @@ -844,6 +927,7 @@ type LavaMap = Arc>>>; lazy_static! { static ref LAVA_MAP: LavaMap = Arc::new(Mutex::new(BTreeMap::new())); static ref MAX_CONCURRENT_REQUESTS: Arc> = Arc::new(Mutex::new(20)); + static ref UPLOAD_SERVER: Arc> = Default::default(); } async fn new_job(job: Job) -> Result, ()> { @@ -921,6 +1005,10 @@ async fn new_job(job: Job) -> Result, body: Bytes) { + UPLOAD_SERVER.lock().unwrap().upload_file(&job, &path, body); +} + #[tokio::main] async fn main() { let opts = Opts::from_args(); @@ -949,6 +1037,77 @@ async fn main() { ); } + tokio::spawn(async { + let local_port = match env::var("LAVA_GITLAB_RUNNER_LOCAL_PORT") { + Ok(val) => val + .parse() + .expect("failed to parse LAVA_GITLAB_RUNNER_LOCAL_PORT as a port number"), + Err(_) => { + warn!("No LAVA_GITLAB_RUNNER_LOCAL_PORT set, will listen on ephemeral IP."); + 0u16 + } + }; + + let listener = std::net::TcpListener::bind(std::net::SocketAddr::new( + IpAddr::V4(Ipv4Addr::UNSPECIFIED), + local_port, + )) + .expect("failed to bind listener"); + + let routable_host = match env::var("LAVA_GITLAB_RUNNER_ROUTABLE_HOST") { + Ok(val) => val, + Err(_) => { + let host = local_ip_address::local_ip() + .expect("failed to determine local ip") + .to_string(); + + warn!( + "No LAVA_GITLAB_RUNNER_ROUTABLE_HOST set, using best guess of local IP {}.", + host + ); + host + } + }; + + let routable_port = match env::var("LAVA_GITLAB_RUNNER_ROUTABLE_PORT") { + Ok(val) => val + .parse() + .expect("failed to parse LAVA_GITLAB_RUNNER_ROUTABLE_PORT as a port number"), + Err(_) => { + let port = listener + .local_addr() + .expect("failed to get local address") + .port(); + + info!( + "No LAVA_GITLAB_RUNNER_ROUTABLE_PORT set, using local port {}.", + port + ); + port + } + }; + + let routable_addr = format!("{}:{}", routable_host, routable_port); + + info!( + "Artifact upload listening on {} (reporting routable {})", + listener.local_addr().expect("failed to get local address"), + routable_addr + ); + + UPLOAD_SERVER + .lock() + .unwrap() + .set_base_address(routable_addr); + let app = Router::new().route("/artifacts/:job/*path", post(upload_artifact)); + + axum::Server::from_tcp(listener) + .expect("failed to create axum server from TCP listener") + .serve(app.into_make_service()) + .await + .unwrap(); + }); + runner .run(new_job, 64) .await diff --git a/src/upload.rs b/src/upload.rs new file mode 100644 index 0000000..6f38920 --- /dev/null +++ b/src/upload.rs @@ -0,0 +1,112 @@ +use std::collections::BTreeMap; +use std::sync::{Arc, Mutex, Weak}; + +use bytes::Bytes; +use rand::distributions::Alphanumeric; +use rand::{Rng, SeedableRng}; +use rand_chacha::ChaCha20Rng; +use tracing::warn; + +pub struct UploadServer { + rng: ChaCha20Rng, + job_cache: BTreeMap>>, + base_addr: Option, +} + +impl UploadServer { + pub fn new() -> Self { + Self { + rng: ChaCha20Rng::from_entropy(), + job_cache: Default::default(), + base_addr: None, + } + } + + pub fn add_new_job(&mut self) -> Arc> { + // Wipe any dead jobs as the new one starts. It's not hugely + // important when this happens, so long as it happens + // periodically. + self.cleanup(); + + let prefix = self.generate_unique_id(); + let base_addr = self + .base_addr + .as_ref() + .expect("failed to set base_address on UploadServer before adding jobs."); + let url = format!("https://{}/artifacts/{}/", base_addr, prefix); + let ja: Arc> = Arc::new(Mutex::new(JobArtifacts::new(url))); + self.job_cache.insert(prefix, Arc::downgrade(&ja)); + ja + } + + pub fn set_base_address(&mut self, base_addr: String) { + self.base_addr = Some(base_addr); + } + + pub fn upload_file(&mut self, key: &str, path: &str, data: Bytes) { + if let Some(ja) = self.job_cache.get(key).and_then(Weak::upgrade) { + ja.lock().unwrap().upload_artifact(path, data) + } else { + warn!( + "Ignoring attempt to upload {} for non-existent or expired job", + path + ); + } + } + + fn generate_unique_id(&mut self) -> String { + (&mut self.rng) + .sample_iter(&Alphanumeric) + .take(64) + .map(char::from) + .collect() + } + + pub fn cleanup(&mut self) { + let mut cleaned = Vec::new(); + for (k, v) in self.job_cache.iter() { + if v.strong_count() == 0 { + cleaned.push(k.clone()); + } + } + for k in cleaned { + self.job_cache.remove(&k); + } + } +} + +impl Default for UploadServer { + fn default() -> Self { + Self::new() + } +} + +pub struct JobArtifacts { + artifacts: BTreeMap, + url: String, +} + +impl JobArtifacts { + fn new(url: String) -> Self { + JobArtifacts { + artifacts: Default::default(), + url, + } + } + + pub fn get_upload_url(&self) -> &str { + &self.url + } + + pub fn get_artifact_paths(&self) -> impl Iterator { + self.artifacts.iter().map(|x| x.0.as_ref()) + } + + pub fn get_artifact_data(&self, path: &str) -> Option { + self.artifacts.get(path).map(Clone::clone) + } + + pub fn upload_artifact(&mut self, path: &str, data: Bytes) { + self.artifacts.insert(path.to_string(), data); + } +}