|
| 1 | +use std::time::Duration; |
| 2 | + |
| 3 | +use async_trait::async_trait; |
| 4 | +use camino::{Utf8Path, Utf8PathBuf}; |
| 5 | +use clap::{Args, Subcommand}; |
| 6 | +use color_eyre::eyre::{Context, Report, Result, bail, eyre}; |
| 7 | +use obo_core::{ |
| 8 | + actions::{ |
| 9 | + Actions, DEFAULT_BUILD_INFO, DEFAULT_BUILD_LOG, DownloadBinariesAction, DputAction, |
| 10 | + LOG_TAIL_2MB, MonitorAction, ObsBuildInfo, PruneAction, |
| 11 | + }, |
| 12 | + artifacts::{ArtifactDirectory, ArtifactReader, ArtifactWriter, MissingArtifact, SaveCallback}, |
| 13 | + build_meta::RepoArch, |
| 14 | + monitor::PackageMonitoringOptions, |
| 15 | + outputln, |
| 16 | +}; |
| 17 | +use open_build_service_api as obs; |
| 18 | +use serde::{Deserialize, Serialize}; |
| 19 | +use tempfile::NamedTempFile; |
| 20 | +use tokio::{ |
| 21 | + fs::File as AsyncFile, |
| 22 | + io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, |
| 23 | +}; |
| 24 | + |
| 25 | +pub const DEFAULT_MONITOR_TABLE: &str = "obs-monitor.json"; |
| 26 | + |
| 27 | +#[derive(Debug, Deserialize, Serialize)] |
| 28 | +pub struct MonitorCommands { |
| 29 | + pub monitor: String, |
| 30 | + pub download_binaries: Option<String>, |
| 31 | +} |
| 32 | + |
| 33 | +#[derive(Debug, Deserialize, Serialize)] |
| 34 | +pub struct MonitorEntry { |
| 35 | + #[serde(flatten)] |
| 36 | + pub repo_arch: RepoArch, |
| 37 | + pub commands: MonitorCommands, |
| 38 | +} |
| 39 | + |
| 40 | +#[derive(Debug, Deserialize, Serialize)] |
| 41 | +pub struct MonitorTable { |
| 42 | + pub entries: Vec<MonitorEntry>, |
| 43 | +} |
| 44 | + |
| 45 | +#[derive(Args)] |
| 46 | +pub struct GenerateMonitorAction { |
| 47 | + #[clap(long, default_value_t = DEFAULT_BUILD_INFO.to_owned())] |
| 48 | + build_info: String, |
| 49 | + #[clap(long, default_value_t = DEFAULT_MONITOR_TABLE.to_owned())] |
| 50 | + monitor_out: String, |
| 51 | + #[clap(long, default_value_t = DEFAULT_BUILD_LOG.into())] |
| 52 | + build_log_out: String, |
| 53 | + #[clap(long = "download-build-results-to")] |
| 54 | + build_results_dir: Option<Utf8PathBuf>, |
| 55 | +} |
| 56 | + |
| 57 | +#[derive(Subcommand)] |
| 58 | +pub enum CliAction { |
| 59 | + Dput(DputAction), |
| 60 | + Monitor { |
| 61 | + #[clap(flatten)] |
| 62 | + args: MonitorAction, |
| 63 | + |
| 64 | + // These are needed by the integration tests. |
| 65 | + #[clap(long, hide = true, env = "OBO_TEST_LOG_TAIL", default_value_t = LOG_TAIL_2MB)] |
| 66 | + log_tail: u64, |
| 67 | + #[clap(long, hide = true, env = "OBO_TEST_SLEEP_ON_BUILDING_MS")] |
| 68 | + sleep_on_building_ms: Option<u64>, |
| 69 | + #[clap(long, hide = true, env = "OBO_TEST_SLEEP_ON_OLD_STATUS_MS")] |
| 70 | + sleep_on_old_status_ms: Option<u64>, |
| 71 | + }, |
| 72 | + GenerateMonitor(GenerateMonitorAction), |
| 73 | + DownloadBinaries(DownloadBinariesAction), |
| 74 | + Prune(PruneAction), |
| 75 | +} |
| 76 | + |
| 77 | +#[derive(Default)] |
| 78 | +pub struct LocalFsArtifacts(pub Utf8PathBuf); |
| 79 | + |
| 80 | +#[async_trait] |
| 81 | +impl ArtifactDirectory for LocalFsArtifacts { |
| 82 | + async fn open(&self, path: impl AsRef<Utf8Path> + Send) -> Result<ArtifactReader> { |
| 83 | + let path = self.0.join(path.as_ref()); |
| 84 | + AsyncFile::open(&path) |
| 85 | + .await |
| 86 | + .map(ArtifactReader::new) |
| 87 | + .map_err(|e| { |
| 88 | + if e.kind() == std::io::ErrorKind::NotFound { |
| 89 | + eyre!(MissingArtifact(path)) |
| 90 | + } else { |
| 91 | + eyre!(e) |
| 92 | + } |
| 93 | + }) |
| 94 | + } |
| 95 | + |
| 96 | + async fn save_with<Ret, Err, F, P>(&mut self, path: P, func: F) -> Result<Ret> |
| 97 | + where |
| 98 | + Report: From<Err>, |
| 99 | + Ret: Send, |
| 100 | + Err: Send, |
| 101 | + F: for<'a> SaveCallback<'a, Ret, Err> + Send, |
| 102 | + P: AsRef<Utf8Path> + Send, |
| 103 | + { |
| 104 | + let path = self.0.join(path.as_ref()); |
| 105 | + let parent = path.parent().unwrap_or_else(|| Utf8Path::new(".")); |
| 106 | + tokio::fs::create_dir_all(&parent) |
| 107 | + .await |
| 108 | + .wrap_err_with(|| format!("Failed to create parents of '{path}"))?; |
| 109 | + |
| 110 | + let Some(basename) = path.file_name() else { |
| 111 | + bail!("Invalid path: {path}"); |
| 112 | + }; |
| 113 | + let temp = NamedTempFile::with_prefix_in(basename, parent) |
| 114 | + .wrap_err("Failed to create temporary file")?; |
| 115 | + |
| 116 | + let mut writer = ArtifactWriter::new(AsyncFile::from_std(temp.as_file().try_clone()?)); |
| 117 | + let ret = func(&mut writer).await?; |
| 118 | + |
| 119 | + writer.flush().await?; |
| 120 | + temp.persist(&path)?; |
| 121 | + Ok(ret) |
| 122 | + } |
| 123 | +} |
| 124 | + |
| 125 | +pub struct Handler { |
| 126 | + actions: Actions, |
| 127 | + artifacts: LocalFsArtifacts, |
| 128 | +} |
| 129 | + |
| 130 | +impl Handler { |
| 131 | + pub fn new(client: obs::Client, artifacts_dir: Utf8PathBuf) -> Self { |
| 132 | + Self { |
| 133 | + actions: Actions { client }, |
| 134 | + artifacts: LocalFsArtifacts(artifacts_dir), |
| 135 | + } |
| 136 | + } |
| 137 | + |
| 138 | + async fn generate_monitor(&mut self, args: GenerateMonitorAction) -> Result<()> { |
| 139 | + let build_info_data = self.artifacts.read_string(&args.build_info).await?; |
| 140 | + let build_info: ObsBuildInfo = serde_json::from_str(&build_info_data) |
| 141 | + .wrap_err("Failed to parse provided build info file")?; |
| 142 | + |
| 143 | + let rev = build_info |
| 144 | + .rev |
| 145 | + .ok_or_else(|| eyre!("Build revision was not set"))?; |
| 146 | + let srcmd5 = build_info |
| 147 | + .srcmd5 |
| 148 | + .ok_or_else(|| eyre!("Build srcmd5 was not set"))?; |
| 149 | + |
| 150 | + let mut table = MonitorTable { entries: vec![] }; |
| 151 | + for enabled_repo in build_info.enabled_repos { |
| 152 | + table.entries.push(MonitorEntry { |
| 153 | + repo_arch: enabled_repo.repo_arch.clone(), |
| 154 | + commands: MonitorCommands { |
| 155 | + monitor: MonitorAction { |
| 156 | + project: build_info.project.clone(), |
| 157 | + package: build_info.package.clone(), |
| 158 | + repository: enabled_repo.repo_arch.repo.clone(), |
| 159 | + arch: enabled_repo.repo_arch.arch.clone(), |
| 160 | + rev: rev.clone(), |
| 161 | + srcmd5: srcmd5.clone(), |
| 162 | + prev_endtime_for_commit: enabled_repo.prev_endtime_for_commit, |
| 163 | + build_log_out: args.build_log_out.clone(), |
| 164 | + } |
| 165 | + .generate_command(), |
| 166 | + download_binaries: args.build_results_dir.clone().map(|build_results_dir| { |
| 167 | + DownloadBinariesAction { |
| 168 | + project: build_info.project.clone(), |
| 169 | + package: build_info.package.clone(), |
| 170 | + repository: enabled_repo.repo_arch.repo, |
| 171 | + arch: enabled_repo.repo_arch.arch, |
| 172 | + build_results_dir, |
| 173 | + } |
| 174 | + .generate_command() |
| 175 | + }), |
| 176 | + }, |
| 177 | + }); |
| 178 | + } |
| 179 | + |
| 180 | + let data = serde_json::to_string(&table).wrap_err("Failed to serialize data")?; |
| 181 | + |
| 182 | + self.artifacts |
| 183 | + .write(&args.monitor_out, data.as_bytes()) |
| 184 | + .await?; |
| 185 | + outputln!("Wrote monitor file '{}'.", args.monitor_out); |
| 186 | + |
| 187 | + Ok(()) |
| 188 | + } |
| 189 | + |
| 190 | + pub async fn run(&mut self, action: CliAction) -> Result<()> { |
| 191 | + match action { |
| 192 | + CliAction::Dput(args) => self.actions.dput(args, &mut self.artifacts).await?, |
| 193 | + CliAction::Monitor { |
| 194 | + log_tail, |
| 195 | + sleep_on_building_ms, |
| 196 | + sleep_on_old_status_ms, |
| 197 | + args, |
| 198 | + } => { |
| 199 | + let mut options = PackageMonitoringOptions::default(); |
| 200 | + if let Some(value) = sleep_on_building_ms { |
| 201 | + options.sleep_on_building = Duration::from_millis(value); |
| 202 | + } |
| 203 | + if let Some(value) = sleep_on_old_status_ms { |
| 204 | + options.sleep_on_old_status = Duration::from_millis(value); |
| 205 | + } |
| 206 | + |
| 207 | + self.actions |
| 208 | + .monitor( |
| 209 | + args, |
| 210 | + options, |
| 211 | + |file| async { |
| 212 | + let mut lines = BufReader::new(file).lines(); |
| 213 | + while let Some(line) = lines.next_line().await? { |
| 214 | + eprintln!("{line}"); |
| 215 | + } |
| 216 | + Ok(()) |
| 217 | + }, |
| 218 | + log_tail, |
| 219 | + &mut self.artifacts, |
| 220 | + ) |
| 221 | + .await? |
| 222 | + } |
| 223 | + CliAction::GenerateMonitor(args) => self.generate_monitor(args).await?, |
| 224 | + CliAction::DownloadBinaries(args) => { |
| 225 | + self.actions |
| 226 | + .download_binaries(args, &mut self.artifacts) |
| 227 | + .await? |
| 228 | + } |
| 229 | + CliAction::Prune(args) => self.actions.prune(args, &self.artifacts).await?, |
| 230 | + } |
| 231 | + |
| 232 | + Ok(()) |
| 233 | + } |
| 234 | +} |
0 commit comments