From 6b4cdaffe7a6d6aa0687d9fe92287231c04a01c7 Mon Sep 17 00:00:00 2001 From: Antheas Kapenekakis Date: Mon, 25 Nov 2024 23:56:08 +0100 Subject: [PATCH 1/2] feat: add total progress bar --- lib/src/deploy.rs | 22 +++++++++++++++++----- ostree-ext/src/container/store.rs | 2 +- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/lib/src/deploy.rs b/lib/src/deploy.rs index 960c1abd..02472944 100644 --- a/lib/src/deploy.rs +++ b/lib/src/deploy.rs @@ -142,6 +142,7 @@ async fn handle_layer_progress_print( mut layers: tokio::sync::mpsc::Receiver, mut layer_bytes: tokio::sync::watch::Receiver>, n_layers_to_fetch: usize, + download_bytes: u64, ) { let start = std::time::Instant::now(); let mut total_read = 0u64; @@ -150,23 +151,30 @@ async fn handle_layer_progress_print( n_layers_to_fetch.try_into().unwrap(), )); let byte_bar = bar.add(indicatif::ProgressBar::new(0)); + let total_byte_bar = bar.add(indicatif::ProgressBar::new(download_bytes)); // let byte_bar = indicatif::ProgressBar::new(0); // byte_bar.set_draw_target(indicatif::ProgressDrawTarget::hidden()); + println!(""); layers_bar.set_style( indicatif::ProgressStyle::default_bar() - .template("{prefix} {bar} {pos}/{len} {wide_msg}") + .template("{prefix} {pos}/{len} {bar:15}") .unwrap(), ); - layers_bar.set_prefix("Fetching layers"); + layers_bar.set_prefix("Fetched Layers"); layers_bar.set_message(""); - byte_bar.set_prefix("Fetching"); byte_bar.set_style( indicatif::ProgressStyle::default_bar() .template( - " └ {prefix} {bar} {binary_bytes}/{binary_total_bytes} ({binary_bytes_per_sec}) {wide_msg}", + " └ {bar:20} {msg} ({binary_bytes}/{binary_total_bytes})", ) .unwrap() ); + total_byte_bar.set_prefix("Total"); + total_byte_bar.set_style( + indicatif::ProgressStyle::default_bar() + .template("\n{prefix} {bar:30} {binary_bytes}/{binary_total_bytes} ({binary_bytes_per_sec}, {elapsed}/{duration})") + .unwrap(), + ); loop { tokio::select! { // Always handle layer changes first. @@ -186,6 +194,7 @@ async fn handle_layer_progress_print( byte_bar.set_position(layer_size); layers_bar.inc(1); total_read = total_read.saturating_add(layer_size); + total_byte_bar.set_position(total_read); } } else { // If the receiver is disconnected, then we're done @@ -200,6 +209,7 @@ async fn handle_layer_progress_print( let bytes = layer_bytes.borrow(); if let Some(bytes) = &*bytes { byte_bar.set_position(bytes.fetched); + total_byte_bar.set_position(total_read + bytes.fetched); } } } @@ -250,11 +260,13 @@ pub(crate) async fn pull( ostree_ext::cli::print_layer_status(&prep); let layers_to_fetch = prep.layers_to_fetch().collect::>>()?; let n_layers_to_fetch = layers_to_fetch.len(); + let download_bytes: u64 = layers_to_fetch.iter().map(|(l, _)| l.layer.size()).sum(); + let printer = (!quiet).then(|| { let layer_progress = imp.request_progress(); let layer_byte_progress = imp.request_layer_progress(); tokio::task::spawn(async move { - handle_layer_progress_print(layer_progress, layer_byte_progress, n_layers_to_fetch) + handle_layer_progress_print(layer_progress, layer_byte_progress, n_layers_to_fetch, download_bytes) .await }) }); diff --git a/ostree-ext/src/container/store.rs b/ostree-ext/src/container/store.rs index d4e4a4fe..efba13e5 100644 --- a/ostree-ext/src/container/store.rs +++ b/ostree-ext/src/container/store.rs @@ -193,7 +193,7 @@ pub enum PrepareResult { #[derive(Debug)] pub struct ManifestLayerState { /// The underlying layer descriptor. - pub(crate) layer: oci_image::Descriptor, + pub layer: oci_image::Descriptor, // TODO semver: Make this readonly via an accessor /// The ostree ref name for this layer. pub ostree_ref: String, From 29a7959ca86a28e12060fb41ea8407c86eb71706 Mon Sep 17 00:00:00 2001 From: Antheas Kapenekakis Date: Tue, 26 Nov 2024 00:28:40 +0100 Subject: [PATCH 2/2] feat: add json output to stderr of upgrade and switch --- lib/src/cli.rs | 14 ++++++-- lib/src/deploy.rs | 88 +++++++++++++++++++++++++++++++++++++++++++--- lib/src/install.rs | 2 +- 3 files changed, 95 insertions(+), 9 deletions(-) diff --git a/lib/src/cli.rs b/lib/src/cli.rs index 9f19a6a6..3e6395d9 100644 --- a/lib/src/cli.rs +++ b/lib/src/cli.rs @@ -52,6 +52,10 @@ pub(crate) struct UpgradeOpts { /// a userspace-only restart. #[clap(long, conflicts_with = "check")] pub(crate) apply: bool, + + /// Pipe download progress to stderr in a jsonl format. + #[clap(long)] + pub(crate) json: bool, } /// Perform an switch operation @@ -101,6 +105,10 @@ pub(crate) struct SwitchOpts { /// Target image to use for the next boot. pub(crate) target: String, + + /// Pipe download progress to stderr in a jsonl format. + #[clap(long)] + pub(crate) json: bool, } /// Options controlling rollback @@ -670,7 +678,7 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> { } } } else { - let fetched = crate::deploy::pull(repo, imgref, None, opts.quiet).await?; + let fetched = crate::deploy::pull(repo, imgref, None, opts.quiet, opts.json).await?; let staged_digest = staged_image.map(|s| s.digest().expect("valid digest in status")); let fetched_digest = &fetched.manifest_digest; tracing::debug!("staged: {staged_digest:?}"); @@ -764,7 +772,7 @@ async fn switch(opts: SwitchOpts) -> Result<()> { } let new_spec = RequiredHostSpec::from_spec(&new_spec)?; - let fetched = crate::deploy::pull(repo, &target, None, opts.quiet).await?; + let fetched = crate::deploy::pull(repo, &target, None, opts.quiet, opts.json).await?; if !opts.retain { // By default, we prune the previous ostree ref so it will go away after later upgrades @@ -826,7 +834,7 @@ async fn edit(opts: EditOpts) -> Result<()> { return crate::deploy::rollback(sysroot).await; } - let fetched = crate::deploy::pull(repo, new_spec.image, None, opts.quiet).await?; + let fetched = crate::deploy::pull(repo, new_spec.image, None, opts.quiet, false).await?; // TODO gc old layers here diff --git a/lib/src/deploy.rs b/lib/src/deploy.rs index 02472944..b561929e 100644 --- a/lib/src/deploy.rs +++ b/lib/src/deploy.rs @@ -45,6 +45,16 @@ pub(crate) struct ImageState { pub(crate) ostree_commit: String, } +/// Download information +#[derive(Debug,serde::Serialize)] +pub struct JsonProgress { + pub done_bytes: u64, + pub download_bytes: u64, + pub image_bytes: u64, + pub n_layers: usize, + pub n_layers_done: usize, +} + impl<'a> RequiredHostSpec<'a> { /// Given a (borrowed) host specification, "unwrap" its internal /// options, giving a spec that is required to have a base container image. @@ -233,6 +243,65 @@ async fn handle_layer_progress_print( } } +/// Write container fetch progress to standard output. +async fn handle_layer_progress_print_jsonl( + mut layers: tokio::sync::mpsc::Receiver, + mut layer_bytes: tokio::sync::watch::Receiver>, + n_layers_to_fetch: usize, + download_bytes: u64, + image_bytes: u64, +) { + let mut total_read = 0u64; + let mut layers_done: usize = 0; + let mut last_json_written = std::time::Instant::now(); + loop { + tokio::select! { + // Always handle layer changes first. + biased; + layer = layers.recv() => { + if let Some(l) = layer { + if !l.is_starting() { + let layer = descriptor_of_progress(&l); + layers_done += 1; + total_read += total_read.saturating_add(layer.size()); + } + } else { + // If the receiver is disconnected, then we're done + break + }; + }, + r = layer_bytes.changed() => { + if r.is_err() { + // If the receiver is disconnected, then we're done + break + } + let bytes = layer_bytes.borrow(); + if let Some(bytes) = &*bytes { + let done_bytes = total_read + bytes.fetched; + + // Lets update the json output only on bytes fetched + // They are common enough, anyhow. Debounce on time. + let curr = std::time::Instant::now(); + if curr.duration_since(last_json_written).as_secs_f64() > 0.2 { + let json = JsonProgress { + done_bytes, + download_bytes, + image_bytes, + n_layers: n_layers_to_fetch, + n_layers_done: layers_done, + }; + let json = serde_json::to_string(&json).unwrap(); + // Write to stderr so that consumer can filter this + eprintln!("{}", json); + last_json_written = curr; + } + } + } + } + } +} + + /// Wrapper for pulling a container image, wiring up status output. #[context("Pulling")] pub(crate) async fn pull( @@ -240,6 +309,7 @@ pub(crate) async fn pull( imgref: &ImageReference, target_imgref: Option<&OstreeImageReference>, quiet: bool, + json: bool, ) -> Result> { let ostree_imgref = &OstreeImageReference::from(imgref.clone()); let mut imp = new_importer(repo, ostree_imgref).await?; @@ -261,14 +331,22 @@ pub(crate) async fn pull( let layers_to_fetch = prep.layers_to_fetch().collect::>>()?; let n_layers_to_fetch = layers_to_fetch.len(); let download_bytes: u64 = layers_to_fetch.iter().map(|(l, _)| l.layer.size()).sum(); + let image_bytes: u64 = prep.all_layers().map(|l| l.layer.size()).sum(); - let printer = (!quiet).then(|| { + let printer = (!quiet || json).then(|| { let layer_progress = imp.request_progress(); let layer_byte_progress = imp.request_layer_progress(); - tokio::task::spawn(async move { - handle_layer_progress_print(layer_progress, layer_byte_progress, n_layers_to_fetch, download_bytes) - .await - }) + if json { + tokio::task::spawn(async move { + handle_layer_progress_print_jsonl(layer_progress, layer_byte_progress, n_layers_to_fetch, download_bytes, image_bytes) + .await + }) + } else { + tokio::task::spawn(async move { + handle_layer_progress_print(layer_progress, layer_byte_progress, n_layers_to_fetch, download_bytes) + .await + }) + } }); let import = imp.import(prep).await; if let Some(printer) = printer { diff --git a/lib/src/install.rs b/lib/src/install.rs index 98857edf..69b5fab5 100644 --- a/lib/src/install.rs +++ b/lib/src/install.rs @@ -744,7 +744,7 @@ async fn install_container( let spec_imgref = ImageReference::from(src_imageref.clone()); let repo = &sysroot.repo(); repo.set_disable_fsync(true); - let r = crate::deploy::pull(repo, &spec_imgref, Some(&state.target_imgref), false).await?; + let r = crate::deploy::pull(repo, &spec_imgref, Some(&state.target_imgref), false, false).await?; repo.set_disable_fsync(false); r };