From e73b413d8e429e9f2b539c388c278e7f423c5ab5 Mon Sep 17 00:00:00 2001 From: Kyle Gospodnetich Date: Tue, 24 Dec 2024 19:20:04 -0800 Subject: [PATCH] chore: Update bootc --- spec_files/bootc/921.patch | 1079 +++++++++++++++++++++++++++-------- spec_files/bootc/bootc.spec | 16 +- 2 files changed, 846 insertions(+), 249 deletions(-) diff --git a/spec_files/bootc/921.patch b/spec_files/bootc/921.patch index f9b042b6..952e8836 100644 --- a/spec_files/bootc/921.patch +++ b/spec_files/bootc/921.patch @@ -1,95 +1,868 @@ -From 6b4cdaffe7a6d6aa0687d9fe92287231c04a01c7 Mon Sep 17 00:00:00 2001 +From 518435fb3eaf78b6e8c8d18e409076d2de3d6546 Mon Sep 17 00:00:00 2001 From: Antheas Kapenekakis -Date: Mon, 25 Nov 2024 23:56:08 +0100 -Subject: [PATCH 1/2] feat: add total progress bar +Date: Thu, 28 Nov 2024 00:12:26 +0100 +Subject: [PATCH] pull: Add --json-fd +This adds a generic "progress" infrastructure for granular +incremental notifications of downloading in particular, but +we may extend this to other generic tasks in the future too. + +Signed-off-by: Antheas Kapenekakis +Signed-off-by: Colin Walters --- - lib/src/deploy.rs | 22 +++++++++++++++++----- - ostree-ext/src/container/store.rs | 2 +- - 2 files changed, 18 insertions(+), 6 deletions(-) + lib/src/cli.rs | 36 +++- + lib/src/deploy.rs | 246 +++++++++++++++++++++++-- + lib/src/install.rs | 10 +- + lib/src/lib.rs | 1 + + lib/src/progress_jsonl.rs | 290 ++++++++++++++++++++++++++++++ + ostree-ext/src/cli.rs | 1 + + ostree-ext/src/container/store.rs | 8 +- + 7 files changed, 568 insertions(+), 24 deletions(-) + create mode 100644 lib/src/progress_jsonl.rs +diff --git a/lib/src/cli.rs b/lib/src/cli.rs +index e293fd940..8e429fe93 100644 +--- a/lib/src/cli.rs ++++ b/lib/src/cli.rs +@@ -4,6 +4,7 @@ + + use std::ffi::{CString, OsStr, OsString}; + use std::io::Seek; ++use std::os::fd::RawFd; + use std::os::unix::process::CommandExt; + use std::process::Command; + +@@ -25,6 +26,8 @@ use serde::{Deserialize, Serialize}; + + use crate::deploy::RequiredHostSpec; + use crate::lints; ++use crate::progress_jsonl; ++use crate::progress_jsonl::ProgressWriter; + use crate::spec::Host; + use crate::spec::ImageReference; + use crate::utils::sigpolicy_from_opts; +@@ -52,6 +55,10 @@ pub(crate) struct UpgradeOpts { + /// a userspace-only restart. + #[clap(long, conflicts_with = "check")] + pub(crate) apply: bool, ++ ++ /// Pipe download progress to this fd in a jsonl format. ++ #[clap(long)] ++ pub(crate) json_fd: Option, + } + + /// Perform an switch operation +@@ -101,6 +108,10 @@ pub(crate) struct SwitchOpts { + + /// Target image to use for the next boot. + pub(crate) target: String, ++ ++ /// Pipe download progress to this fd in a jsonl format. ++ #[clap(long)] ++ pub(crate) json_fd: Option, + } + + /// Options controlling rollback +@@ -644,6 +655,12 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> { + let (booted_deployment, _deployments, host) = + crate::status::get_status_require_booted(sysroot)?; + let imgref = host.spec.image.as_ref(); ++ let prog = opts ++ .json_fd ++ .map(progress_jsonl::ProgressWriter::from_raw_fd) ++ .transpose()? ++ .unwrap_or_default(); ++ + // If there's no specified image, let's be nice and check if the booted system is using rpm-ostree + if imgref.is_none() { + let booted_incompatible = host +@@ -700,7 +717,7 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> { + } + } + } else { +- let fetched = crate::deploy::pull(repo, imgref, None, opts.quiet).await?; ++ let fetched = crate::deploy::pull(repo, imgref, None, opts.quiet, prog.clone()).await?; + let staged_digest = staged_image.map(|s| s.digest().expect("valid digest in status")); + let fetched_digest = &fetched.manifest_digest; + tracing::debug!("staged: {staged_digest:?}"); +@@ -723,7 +740,7 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> { + println!("No update available.") + } else { + let osname = booted_deployment.osname(); +- crate::deploy::stage(sysroot, &osname, &fetched, &spec).await?; ++ crate::deploy::stage(sysroot, &osname, &fetched, &spec, prog.clone()).await?; + changed = true; + if let Some(prev) = booted_image.as_ref() { + if let Some(fetched_manifest) = fetched.get_manifest(repo)? { +@@ -759,6 +776,11 @@ async fn switch(opts: SwitchOpts) -> Result<()> { + ); + let target = ostree_container::OstreeImageReference { sigverify, imgref }; + let target = ImageReference::from(target); ++ let prog = opts ++ .json_fd ++ .map(progress_jsonl::ProgressWriter::from_raw_fd) ++ .transpose()? ++ .unwrap_or_default(); + + // If we're doing an in-place mutation, we shortcut most of the rest of the work here + if opts.mutate_in_place { +@@ -794,7 +816,7 @@ async fn switch(opts: SwitchOpts) -> Result<()> { + } + let new_spec = RequiredHostSpec::from_spec(&new_spec)?; + +- let fetched = crate::deploy::pull(repo, &target, None, opts.quiet).await?; ++ let fetched = crate::deploy::pull(repo, &target, None, opts.quiet, prog.clone()).await?; + + if !opts.retain { + // By default, we prune the previous ostree ref so it will go away after later upgrades +@@ -808,7 +830,7 @@ async fn switch(opts: SwitchOpts) -> Result<()> { + } + + let stateroot = booted_deployment.osname(); +- crate::deploy::stage(sysroot, &stateroot, &fetched, &new_spec).await?; ++ crate::deploy::stage(sysroot, &stateroot, &fetched, &new_spec, prog.clone()).await?; + + if opts.apply { + crate::reboot::reboot()?; +@@ -850,18 +872,20 @@ async fn edit(opts: EditOpts) -> Result<()> { + host.spec.verify_transition(&new_host.spec)?; + let new_spec = RequiredHostSpec::from_spec(&new_host.spec)?; + ++ let prog = ProgressWriter::default(); ++ + // We only support two state transitions right now; switching the image, + // or flipping the bootloader ordering. + if host.spec.boot_order != new_host.spec.boot_order { + return crate::deploy::rollback(sysroot).await; + } + +- let fetched = crate::deploy::pull(repo, new_spec.image, None, opts.quiet).await?; ++ let fetched = crate::deploy::pull(repo, new_spec.image, None, opts.quiet, prog.clone()).await?; + + // TODO gc old layers here + + let stateroot = booted_deployment.osname(); +- crate::deploy::stage(sysroot, &stateroot, &fetched, &new_spec).await?; ++ crate::deploy::stage(sysroot, &stateroot, &fetched, &new_spec, prog.clone()).await?; + + Ok(()) + } diff --git a/lib/src/deploy.rs b/lib/src/deploy.rs -index 960c1abd..02472944 100644 +index 960c1abde..7196e2881 100644 --- a/lib/src/deploy.rs +++ b/lib/src/deploy.rs -@@ -142,6 +142,7 @@ async fn handle_layer_progress_print( +@@ -21,6 +21,7 @@ use ostree_ext::ostree::{self, Sysroot}; + use ostree_ext::sysroot::SysrootLock; + use ostree_ext::tokio_util::spawn_blocking_cancellable_flatten; + ++use crate::progress_jsonl::{Event, ProgressWriter, SubTaskBytes, SubTaskStep, API_VERSION}; + use crate::spec::ImageReference; + use crate::spec::{BootOrder, HostSpec}; + use crate::status::labels_of_config; +@@ -141,11 +142,20 @@ fn prefix_of_progress(p: &ImportProgress) -> &'static str { + async fn handle_layer_progress_print( mut layers: tokio::sync::mpsc::Receiver, mut layer_bytes: tokio::sync::watch::Receiver>, ++ digest: Box, n_layers_to_fetch: usize, -+ download_bytes: u64, ++ layers_total: usize, ++ bytes_to_download: u64, ++ bytes_total: u64, ++ prog: ProgressWriter, ++ quiet: bool, ) { let start = std::time::Instant::now(); let mut total_read = 0u64; -@@ -150,23 +151,30 @@ async fn handle_layer_progress_print( + let bar = indicatif::MultiProgress::new(); ++ if quiet { ++ bar.set_draw_target(indicatif::ProgressDrawTarget::hidden()); ++ } + let layers_bar = bar.add(indicatif::ProgressBar::new( n_layers_to_fetch.try_into().unwrap(), )); - let byte_bar = bar.add(indicatif::ProgressBar::new(0)); -+ let total_byte_bar = bar.add(indicatif::ProgressBar::new(download_bytes)); - // let byte_bar = indicatif::ProgressBar::new(0); - // byte_bar.set_draw_target(indicatif::ProgressDrawTarget::hidden()); -+ println!(""); - layers_bar.set_style( - indicatif::ProgressStyle::default_bar() -- .template("{prefix} {bar} {pos}/{len} {wide_msg}") -+ .template("{prefix} {pos}/{len} {bar:15}") +@@ -157,7 +167,8 @@ async fn handle_layer_progress_print( + .template("{prefix} {bar} {pos}/{len} {wide_msg}") .unwrap(), ); - layers_bar.set_prefix("Fetching layers"); -+ layers_bar.set_prefix("Fetched Layers"); ++ let taskname = "Fetching layers"; ++ layers_bar.set_prefix(taskname); layers_bar.set_message(""); -- byte_bar.set_prefix("Fetching"); + byte_bar.set_prefix("Fetching"); byte_bar.set_style( - indicatif::ProgressStyle::default_bar() - .template( -- " └ {prefix} {bar} {binary_bytes}/{binary_total_bytes} ({binary_bytes_per_sec}) {wide_msg}", -+ " └ {bar:20} {msg} ({binary_bytes}/{binary_total_bytes})", +@@ -167,6 +178,9 @@ async fn handle_layer_progress_print( ) .unwrap() ); -+ total_byte_bar.set_prefix("Total"); -+ total_byte_bar.set_style( -+ indicatif::ProgressStyle::default_bar() -+ .template("\n{prefix} {bar:30} {binary_bytes}/{binary_total_bytes} ({binary_bytes_per_sec}, {elapsed}/{duration})") -+ .unwrap(), -+ ); ++ ++ let mut subtasks = vec![]; ++ let mut subtask: SubTaskBytes = Default::default(); loop { tokio::select! { // Always handle layer changes first. -@@ -186,6 +194,7 @@ async fn handle_layer_progress_print( +@@ -174,18 +188,44 @@ async fn handle_layer_progress_print( + layer = layers.recv() => { + if let Some(l) = layer { + let layer = descriptor_of_progress(&l); ++ let layer_type = prefix_of_progress(&l); ++ let short_digest = &layer.digest().digest()[0..21]; + let layer_size = layer.size(); + if l.is_starting() { ++ // Reset the progress bar + byte_bar.reset_elapsed(); + byte_bar.reset_eta(); + byte_bar.set_length(layer_size); +- let layer_type = prefix_of_progress(&l); +- let short_digest = &layer.digest().digest()[0..21]; + byte_bar.set_message(format!("{layer_type} {short_digest}")); ++ ++ subtask = SubTaskBytes { ++ subtask: layer_type.into(), ++ description: format!("{layer_type}: {short_digest}").clone().into(), ++ id: format!("{short_digest}").clone().into(), ++ bytes_cached: 0, ++ bytes: 0, ++ bytes_total: layer_size, ++ }; + } else { byte_bar.set_position(layer_size); layers_bar.inc(1); total_read = total_read.saturating_add(layer_size); -+ total_byte_bar.set_position(total_read); ++ // Emit an event where bytes == total to signal completion. ++ subtask.bytes = layer_size; ++ subtasks.push(subtask.clone()); ++ prog.send(Event::ProgressBytes { ++ api_version: API_VERSION.into(), ++ task: "pulling".into(), ++ description: format!("Pulling Image: {digest}").into(), ++ id: (*digest).into(), ++ bytes_cached: bytes_total - bytes_to_download, ++ bytes: total_read, ++ bytes_total: bytes_to_download, ++ steps_cached: (layers_total - n_layers_to_fetch) as u64, ++ steps: layers_bar.position(), ++ steps_total: n_layers_to_fetch as u64, ++ subtasks: subtasks.clone(), ++ }).await; } } else { // If the receiver is disconnected, then we're done -@@ -200,6 +209,7 @@ async fn handle_layer_progress_print( - let bytes = layer_bytes.borrow(); - if let Some(bytes) = &*bytes { +@@ -197,9 +237,26 @@ async fn handle_layer_progress_print( + // If the receiver is disconnected, then we're done + break + } +- let bytes = layer_bytes.borrow(); +- if let Some(bytes) = &*bytes { ++ let bytes = { ++ let bytes = layer_bytes.borrow_and_update(); ++ bytes.as_ref().cloned() ++ }; ++ if let Some(bytes) = bytes { byte_bar.set_position(bytes.fetched); -+ total_byte_bar.set_position(total_read + bytes.fetched); ++ subtask.bytes = byte_bar.position(); ++ prog.send_lossy(Event::ProgressBytes { ++ api_version: API_VERSION.into(), ++ task: "pulling".into(), ++ description: format!("Pulling Image: {digest}").into(), ++ id: (*digest).into(), ++ bytes_cached: bytes_total - bytes_to_download, ++ bytes: total_read + byte_bar.position(), ++ bytes_total: bytes_to_download, ++ steps_cached: (layers_total - n_layers_to_fetch) as u64, ++ steps: layers_bar.position(), ++ steps_total: n_layers_to_fetch as u64, ++ subtasks: subtasks.clone().into_iter().chain([subtask.clone()]).collect(), ++ }).await; } } } -@@ -250,11 +260,13 @@ pub(crate) async fn pull( +@@ -221,6 +278,27 @@ async fn handle_layer_progress_print( + )) { + tracing::warn!("writing to stdout: {e}"); + } ++ ++ // Since the progress notifier closed, we know import has started ++ // use as a heuristic to begin import progress ++ // Cannot be lossy or it is dropped ++ prog.send(Event::ProgressSteps { ++ api_version: API_VERSION.into(), ++ task: "importing".into(), ++ description: "Importing Image".into(), ++ id: (*digest).into(), ++ steps_cached: 0, ++ steps: 0, ++ steps_total: 1, ++ subtasks: [SubTaskStep { ++ subtask: "importing".into(), ++ description: "Importing Image".into(), ++ id: "importing".into(), ++ completed: false, ++ }] ++ .into(), ++ }) ++ .await; + } + + /// Wrapper for pulling a container image, wiring up status output. +@@ -230,6 +308,7 @@ pub(crate) async fn pull( + imgref: &ImageReference, + target_imgref: Option<&OstreeImageReference>, + quiet: bool, ++ prog: ProgressWriter, + ) -> Result> { + let ostree_imgref = &OstreeImageReference::from(imgref.clone()); + let mut imp = new_importer(repo, ostree_imgref).await?; +@@ -250,20 +329,52 @@ pub(crate) async fn pull( ostree_ext::cli::print_layer_status(&prep); let layers_to_fetch = prep.layers_to_fetch().collect::>>()?; let n_layers_to_fetch = layers_to_fetch.len(); -+ let download_bytes: u64 = layers_to_fetch.iter().map(|(l, _)| l.layer.size()).sum(); -+ - let printer = (!quiet).then(|| { - let layer_progress = imp.request_progress(); - let layer_byte_progress = imp.request_layer_progress(); - tokio::task::spawn(async move { +- let printer = (!quiet).then(|| { +- let layer_progress = imp.request_progress(); +- let layer_byte_progress = imp.request_layer_progress(); +- tokio::task::spawn(async move { - handle_layer_progress_print(layer_progress, layer_byte_progress, n_layers_to_fetch) -+ handle_layer_progress_print(layer_progress, layer_byte_progress, n_layers_to_fetch, download_bytes) - .await - }) +- .await +- }) ++ let layers_total = prep.all_layers().count(); ++ let bytes_to_fetch: u64 = layers_to_fetch.iter().map(|(l, _)| l.layer.size()).sum(); ++ let bytes_total: u64 = prep.all_layers().map(|l| l.layer.size()).sum(); ++ ++ let prog_print = prog.clone(); ++ let digest = prep.manifest_digest.clone(); ++ let digest_imp = prep.manifest_digest.clone(); ++ let layer_progress = imp.request_progress(); ++ let layer_byte_progress = imp.request_layer_progress(); ++ let printer = tokio::task::spawn(async move { ++ handle_layer_progress_print( ++ layer_progress, ++ layer_byte_progress, ++ digest.as_ref().into(), ++ n_layers_to_fetch, ++ layers_total, ++ bytes_to_fetch, ++ bytes_total, ++ prog_print, ++ quiet, ++ ) ++ .await }); + let import = imp.import(prep).await; +- if let Some(printer) = printer { +- let _ = printer.await; +- } ++ let _ = printer.await; ++ // Both the progress and the import are done, so import is done as well ++ prog.send(Event::ProgressSteps { ++ api_version: API_VERSION.into(), ++ task: "importing".into(), ++ description: "Importing Image".into(), ++ id: digest_imp.clone().as_ref().into(), ++ steps_cached: 0, ++ steps: 1, ++ steps_total: 1, ++ subtasks: [SubTaskStep { ++ subtask: "importing".into(), ++ description: "Importing Image".into(), ++ id: "importing".into(), ++ completed: true, ++ }] ++ .into(), ++ }) ++ .await; + let import = import?; + let wrote_imgref = target_imgref.as_ref().unwrap_or(&ostree_imgref); ++ + if let Some(msg) = + ostree_container::store::image_filtered_content_warning(repo, &wrote_imgref.imgref) + .context("Image content warning")? +@@ -450,8 +561,53 @@ pub(crate) async fn stage( + stateroot: &str, + image: &ImageState, + spec: &RequiredHostSpec<'_>, ++ prog: ProgressWriter, + ) -> Result<()> { ++ let mut subtask = SubTaskStep { ++ subtask: "merging".into(), ++ description: "Merging Image".into(), ++ id: "fetching".into(), ++ completed: false, ++ }; ++ let mut subtasks = vec![]; ++ prog.send(Event::ProgressSteps { ++ api_version: API_VERSION.into(), ++ task: "staging".into(), ++ description: "Deploying Image".into(), ++ id: image.manifest_digest.clone().as_ref().into(), ++ steps_cached: 0, ++ steps: 0, ++ steps_total: 3, ++ subtasks: subtasks ++ .clone() ++ .into_iter() ++ .chain([subtask.clone()]) ++ .collect(), ++ }) ++ .await; + let merge_deployment = sysroot.merge_deployment(Some(stateroot)); ++ ++ subtask.completed = true; ++ subtasks.push(subtask.clone()); ++ subtask.subtask = "deploying".into(); ++ subtask.id = "deploying".into(); ++ subtask.description = "Deploying Image".into(); ++ subtask.completed = false; ++ prog.send(Event::ProgressSteps { ++ api_version: API_VERSION.into(), ++ task: "staging".into(), ++ description: "Deploying Image".into(), ++ id: image.manifest_digest.clone().as_ref().into(), ++ steps_cached: 0, ++ steps: 1, ++ steps_total: 3, ++ subtasks: subtasks ++ .clone() ++ .into_iter() ++ .chain([subtask.clone()]) ++ .collect(), ++ }) ++ .await; + let origin = origin_from_imageref(spec.image)?; + let deployment = crate::deploy::deploy( + sysroot, +@@ -462,8 +618,50 @@ pub(crate) async fn stage( + ) + .await?; + ++ subtask.completed = true; ++ subtasks.push(subtask.clone()); ++ subtask.subtask = "bound_images".into(); ++ subtask.id = "bound_images".into(); ++ subtask.description = "Pulling Bound Images".into(); ++ subtask.completed = false; ++ prog.send(Event::ProgressSteps { ++ api_version: API_VERSION.into(), ++ task: "staging".into(), ++ description: "Deploying Image".into(), ++ id: image.manifest_digest.clone().as_ref().into(), ++ steps_cached: 0, ++ steps: 1, ++ steps_total: 3, ++ subtasks: subtasks ++ .clone() ++ .into_iter() ++ .chain([subtask.clone()]) ++ .collect(), ++ }) ++ .await; + crate::boundimage::pull_bound_images(sysroot, &deployment).await?; + ++ subtask.completed = true; ++ subtasks.push(subtask.clone()); ++ subtask.subtask = "cleanup".into(); ++ subtask.id = "cleanup".into(); ++ subtask.description = "Removing old images".into(); ++ subtask.completed = false; ++ prog.send(Event::ProgressSteps { ++ api_version: API_VERSION.into(), ++ task: "staging".into(), ++ description: "Deploying Image".into(), ++ id: image.manifest_digest.clone().as_ref().into(), ++ steps_cached: 0, ++ steps: 2, ++ steps_total: 3, ++ subtasks: subtasks ++ .clone() ++ .into_iter() ++ .chain([subtask.clone()]) ++ .collect(), ++ }) ++ .await; + crate::deploy::cleanup(sysroot).await?; + println!("Queued for next boot: {:#}", spec.image); + if let Some(version) = image.version.as_deref() { +@@ -471,6 +669,24 @@ pub(crate) async fn stage( + } + println!(" Digest: {}", image.manifest_digest); + ++ subtask.completed = true; ++ subtasks.push(subtask.clone()); ++ prog.send(Event::ProgressSteps { ++ api_version: API_VERSION.into(), ++ task: "staging".into(), ++ description: "Deploying Image".into(), ++ id: image.manifest_digest.clone().as_ref().into(), ++ steps_cached: 0, ++ steps: 3, ++ steps_total: 3, ++ subtasks: subtasks ++ .clone() ++ .into_iter() ++ .chain([subtask.clone()]) ++ .collect(), ++ }) ++ .await; ++ + Ok(()) + } + +diff --git a/lib/src/install.rs b/lib/src/install.rs +index 548c3e8d3..2834ae44f 100644 +--- a/lib/src/install.rs ++++ b/lib/src/install.rs +@@ -48,6 +48,7 @@ use crate::boundimage::{BoundImage, ResolvedBoundImage}; + use crate::containerenv::ContainerExecutionInfo; + use crate::lsm; + use crate::mount::Filesystem; ++use crate::progress_jsonl::ProgressWriter; + use crate::spec::ImageReference; + use crate::store::Storage; + use crate::task::Task; +@@ -733,7 +734,14 @@ async fn install_container( + let spec_imgref = ImageReference::from(src_imageref.clone()); + let repo = &sysroot.repo(); + repo.set_disable_fsync(true); +- let r = crate::deploy::pull(repo, &spec_imgref, Some(&state.target_imgref), false).await?; ++ let r = crate::deploy::pull( ++ repo, ++ &spec_imgref, ++ Some(&state.target_imgref), ++ false, ++ ProgressWriter::default(), ++ ) ++ .await?; + repo.set_disable_fsync(false); + r + }; +diff --git a/lib/src/lib.rs b/lib/src/lib.rs +index 1f0c263b5..f90ef3665 100644 +--- a/lib/src/lib.rs ++++ b/lib/src/lib.rs +@@ -41,3 +41,4 @@ pub mod spec; + mod docgen; + mod glyph; + mod imgstorage; ++mod progress_jsonl; +diff --git a/lib/src/progress_jsonl.rs b/lib/src/progress_jsonl.rs +new file mode 100644 +index 000000000..8e97a7631 +--- /dev/null ++++ b/lib/src/progress_jsonl.rs +@@ -0,0 +1,290 @@ ++//! Output progress data using the json-lines format. For more information ++//! see . ++ ++use anyhow::Result; ++use fn_error_context::context; ++use serde::Serialize; ++use std::borrow::Cow; ++use std::os::fd::{FromRawFd, OwnedFd, RawFd}; ++use std::sync::Arc; ++use std::time::Instant; ++use tokio::io::{AsyncWriteExt, BufWriter}; ++use tokio::net::unix::pipe::Sender; ++use tokio::sync::Mutex; ++ ++// Maximum number of times per second that an event will be written. ++const REFRESH_HZ: u16 = 5; ++ ++pub const API_VERSION: &str = "org.containers.bootc.progress/v1"; ++ ++/// An incremental update to e.g. a container image layer download. ++/// The first time a given "subtask" name is seen, a new progress bar should be created. ++/// If bytes == bytes_total, then the subtask is considered complete. ++#[derive(Debug, serde::Serialize, serde::Deserialize, Default, Clone)] ++#[serde(rename_all = "camelCase")] ++pub struct SubTaskBytes<'t> { ++ /// A machine readable type for the task (used for i18n). ++ /// (e.g., "ostree_chunk", "ostree_derived") ++ #[serde(borrow)] ++ pub subtask: Cow<'t, str>, ++ /// A human readable description of the task if i18n is not available. ++ /// (e.g., "OSTree Chunk:", "Derived Layer:") ++ #[serde(borrow)] ++ pub description: Cow<'t, str>, ++ /// A human and machine readable identifier for the task ++ /// (e.g., ostree chunk/layer hash). ++ #[serde(borrow)] ++ pub id: Cow<'t, str>, ++ /// The number of bytes fetched by a previous run (e.g., zstd_chunked). ++ pub bytes_cached: u64, ++ /// Updated byte level progress ++ pub bytes: u64, ++ /// Total number of bytes ++ pub bytes_total: u64, ++} ++ ++/// Marks the beginning and end of a dictrete step ++#[derive(Debug, serde::Serialize, serde::Deserialize, Default, Clone)] ++#[serde(rename_all = "camelCase")] ++pub struct SubTaskStep<'t> { ++ /// A machine readable type for the task (used for i18n). ++ /// (e.g., "ostree_chunk", "ostree_derived") ++ #[serde(borrow)] ++ pub subtask: Cow<'t, str>, ++ /// A human readable description of the task if i18n is not available. ++ /// (e.g., "OSTree Chunk:", "Derived Layer:") ++ #[serde(borrow)] ++ pub description: Cow<'t, str>, ++ /// A human and machine readable identifier for the task ++ /// (e.g., ostree chunk/layer hash). ++ #[serde(borrow)] ++ pub id: Cow<'t, str>, ++ /// Starts as false when beginning to execute and turns true when completed. ++ pub completed: bool, ++} ++ ++/// An event emitted as JSON. ++#[derive(Debug, serde::Serialize, serde::Deserialize)] ++#[serde( ++ tag = "type", ++ rename_all = "PascalCase", ++ rename_all_fields = "camelCase" ++)] ++pub enum Event<'t> { ++ /// An incremental update to a container image layer download ++ ProgressBytes { ++ /// The version of the progress event format. ++ #[serde(borrow)] ++ api_version: Cow<'t, str>, ++ /// A machine readable type (e.g., pulling) for the task (used for i18n ++ /// and UI customization). ++ #[serde(borrow)] ++ task: Cow<'t, str>, ++ /// A human readable description of the task if i18n is not available. ++ #[serde(borrow)] ++ description: Cow<'t, str>, ++ /// A human and machine readable unique identifier for the task ++ /// (e.g., the image name). For tasks that only happen once, ++ /// it can be set to the same value as task. ++ #[serde(borrow)] ++ id: Cow<'t, str>, ++ /// The number of bytes fetched by a previous run. ++ bytes_cached: u64, ++ /// The number of bytes already fetched. ++ bytes: u64, ++ /// Total number of bytes. If zero, then this should be considered "unspecified". ++ bytes_total: u64, ++ /// The number of steps fetched by a previous run. ++ steps_cached: u64, ++ /// The initial position of progress. ++ steps: u64, ++ /// The total number of steps (e.g. container image layers, RPMs) ++ steps_total: u64, ++ /// The currently running subtasks. ++ subtasks: Vec>, ++ }, ++ /// An incremental update with discrete steps ++ ProgressSteps { ++ /// The version of the progress event format. ++ #[serde(borrow)] ++ api_version: Cow<'t, str>, ++ /// A machine readable type (e.g., pulling) for the task (used for i18n ++ /// and UI customization). ++ #[serde(borrow)] ++ task: Cow<'t, str>, ++ /// A human readable description of the task if i18n is not available. ++ #[serde(borrow)] ++ description: Cow<'t, str>, ++ /// A human and machine readable unique identifier for the task ++ /// (e.g., the image name). For tasks that only happen once, ++ /// it can be set to the same value as task. ++ #[serde(borrow)] ++ id: Cow<'t, str>, ++ /// The number of steps fetched by a previous run. ++ steps_cached: u64, ++ /// The initial position of progress. ++ steps: u64, ++ /// The total number of steps (e.g. container image layers, RPMs) ++ steps_total: u64, ++ /// The currently running subtasks. ++ subtasks: Vec>, ++ }, ++} ++ ++#[derive(Debug)] ++struct ProgressWriterInner { ++ last_write: Option, ++ fd: BufWriter, ++} ++ ++#[derive(Clone, Debug, Default)] ++pub(crate) struct ProgressWriter { ++ inner: Arc>>, ++} ++ ++impl TryFrom for ProgressWriter { ++ type Error = anyhow::Error; ++ ++ fn try_from(value: OwnedFd) -> Result { ++ let value = Sender::from_owned_fd(value)?; ++ Ok(Self::from(value)) ++ } ++} ++ ++impl From for ProgressWriter { ++ fn from(value: Sender) -> Self { ++ let inner = ProgressWriterInner { ++ last_write: None, ++ fd: BufWriter::new(value), ++ }; ++ Self { ++ inner: Arc::new(Some(inner).into()), ++ } ++ } ++} ++ ++impl ProgressWriter { ++ /// Given a raw file descriptor, create an instance of a json-lines writer. ++ #[allow(unsafe_code)] ++ #[context("Creating progress writer")] ++ pub(crate) fn from_raw_fd(fd: RawFd) -> Result { ++ unsafe { OwnedFd::from_raw_fd(fd) }.try_into() ++ } ++ ++ /// Serialize the target object to JSON as a single line ++ pub(crate) async fn send_impl(&self, v: T, required: bool) -> Result<()> { ++ let mut guard = self.inner.lock().await; ++ // Check if we have an inner value; if not, nothing to do. ++ let Some(inner) = guard.as_mut() else { ++ return Ok(()); ++ }; ++ ++ // For messages that can be dropped, if we already sent an update within this cycle, discard this one. ++ // TODO: Also consider querying the pipe buffer and also dropping if we can't do this write. ++ let now = Instant::now(); ++ if !required { ++ const REFRESH_MS: u32 = 1000 / REFRESH_HZ as u32; ++ if let Some(elapsed) = inner.last_write.map(|w| now.duration_since(w)) { ++ if elapsed.as_millis() < REFRESH_MS.into() { ++ return Ok(()); ++ } ++ } ++ } ++ ++ // SAFETY: Propagating panics from the mutex here is intentional ++ // serde is guaranteed not to output newlines here ++ let buf = serde_json::to_vec(&v)?; ++ inner.fd.write_all(&buf).await?; ++ // We always end in a newline ++ inner.fd.write_all(b"\n").await?; ++ // And flush to ensure the remote side sees updates immediately ++ inner.fd.flush().await?; ++ // Update the last write time ++ inner.last_write = Some(now); ++ Ok(()) ++ } ++ ++ /// Send an event. ++ pub(crate) async fn send(&self, v: T) { ++ if let Err(e) = self.send_impl(v, true).await { ++ eprintln!("Failed to write to jsonl: {}", e); ++ // Stop writing to fd but let process continue ++ // SAFETY: Propagating panics from the mutex here is intentional ++ let _ = self.inner.lock().await.take(); ++ } ++ } ++ ++ /// Send an event that can be dropped. ++ pub(crate) async fn send_lossy(&self, v: T) { ++ if let Err(e) = self.send_impl(v, false).await { ++ eprintln!("Failed to write to jsonl: {}", e); ++ // Stop writing to fd but let process continue ++ // SAFETY: Propagating panics from the mutex here is intentional ++ let _ = self.inner.lock().await.take(); ++ } ++ } ++ ++ /// Flush remaining data and return the underlying file. ++ #[allow(dead_code)] ++ pub(crate) async fn into_inner(self) -> Result> { ++ // SAFETY: Propagating panics from the mutex here is intentional ++ let mut mutex = self.inner.lock().await; ++ if let Some(inner) = mutex.take() { ++ Ok(Some(inner.fd.into_inner())) ++ } else { ++ Ok(None) ++ } ++ } ++} ++ ++#[cfg(test)] ++mod test { ++ use serde::Deserialize; ++ use tokio::io::{AsyncBufReadExt, BufReader}; ++ ++ use super::*; ++ ++ #[derive(Serialize, Deserialize, PartialEq, Eq, Debug)] ++ struct S { ++ s: String, ++ v: u32, ++ } ++ ++ #[tokio::test] ++ async fn test_jsonl() -> Result<()> { ++ let testvalues = [ ++ S { ++ s: "foo".into(), ++ v: 42, ++ }, ++ S { ++ // Test with an embedded newline to sanity check that serde doesn't write it literally ++ s: "foo\nbar".into(), ++ v: 0, ++ }, ++ ]; ++ let (send, recv) = tokio::net::unix::pipe::pipe()?; ++ let testvalues_sender = &testvalues; ++ let sender = async move { ++ let w = ProgressWriter::try_from(send)?; ++ for value in testvalues_sender { ++ w.send(value).await; ++ } ++ anyhow::Ok(()) ++ }; ++ let testvalues = &testvalues; ++ let receiver = async move { ++ let tf = BufReader::new(recv); ++ let mut expected = testvalues.iter(); ++ let mut lines = tf.lines(); ++ while let Some(line) = lines.next_line().await? { ++ let found: S = serde_json::from_str(&line)?; ++ let expected = expected.next().unwrap(); ++ assert_eq!(&found, expected); ++ } ++ anyhow::Ok(()) ++ }; ++ tokio::try_join!(sender, receiver)?; ++ Ok(()) ++ } ++} +diff --git a/ostree-ext/src/cli.rs b/ostree-ext/src/cli.rs +index fa60ccf41..e63e4240e 100644 +--- a/ostree-ext/src/cli.rs ++++ b/ostree-ext/src/cli.rs +@@ -657,6 +657,7 @@ pub async fn handle_layer_progress_print( + pub fn print_layer_status(prep: &PreparedImport) { + if let Some(status) = prep.format_layer_status() { + println!("{status}"); ++ let _ = std::io::stdout().flush(); + } + } + diff --git a/ostree-ext/src/container/store.rs b/ostree-ext/src/container/store.rs -index d4e4a4fe..efba13e5 100644 +index d4e4a4fec..b2a57a86a 100644 --- a/ostree-ext/src/container/store.rs +++ b/ostree-ext/src/container/store.rs +@@ -102,7 +102,7 @@ impl ImportProgress { + } + + /// Sent across a channel to track the byte-level progress of a layer fetch. +-#[derive(Debug)] ++#[derive(Clone, Debug)] + pub struct LayerProgress { + /// Index of the layer in the manifest + pub layer_index: usize, @@ -193,7 +193,7 @@ pub enum PrepareResult { #[derive(Debug)] pub struct ManifestLayerState { @@ -99,204 +872,14 @@ index d4e4a4fe..efba13e5 100644 // TODO semver: Make this readonly via an accessor /// The ostree ref name for this layer. pub ostree_ref: String, - -From 29a7959ca86a28e12060fb41ea8407c86eb71706 Mon Sep 17 00:00:00 2001 -From: Antheas Kapenekakis -Date: Tue, 26 Nov 2024 00:28:40 +0100 -Subject: [PATCH 2/2] feat: add json output to stderr of upgrade and switch - ---- - lib/src/cli.rs | 14 ++++++-- - lib/src/deploy.rs | 88 +++++++++++++++++++++++++++++++++++++++++++--- - lib/src/install.rs | 2 +- - 3 files changed, 95 insertions(+), 9 deletions(-) - -diff --git a/lib/src/cli.rs b/lib/src/cli.rs -index 9f19a6a6..3e6395d9 100644 ---- a/lib/src/cli.rs -+++ b/lib/src/cli.rs -@@ -52,6 +52,10 @@ pub(crate) struct UpgradeOpts { - /// a userspace-only restart. - #[clap(long, conflicts_with = "check")] - pub(crate) apply: bool, +@@ -952,6 +952,10 @@ impl ImageImporter { + proxy.finalize().await?; + tracing::debug!("finalized proxy"); + ++ // Disconnect progress notifiers to signal we're done with fetching. ++ let _ = self.layer_byte_progress.take(); ++ let _ = self.layer_progress.take(); + -+ /// Pipe download progress to stderr in a jsonl format. -+ #[clap(long)] -+ pub(crate) json: bool, - } - - /// Perform an switch operation -@@ -101,6 +105,10 @@ pub(crate) struct SwitchOpts { - - /// Target image to use for the next boot. - pub(crate) target: String, -+ -+ /// Pipe download progress to stderr in a jsonl format. -+ #[clap(long)] -+ pub(crate) json: bool, - } - - /// Options controlling rollback -@@ -670,7 +678,7 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> { - } - } - } else { -- let fetched = crate::deploy::pull(repo, imgref, None, opts.quiet).await?; -+ let fetched = crate::deploy::pull(repo, imgref, None, opts.quiet, opts.json).await?; - let staged_digest = staged_image.map(|s| s.digest().expect("valid digest in status")); - let fetched_digest = &fetched.manifest_digest; - tracing::debug!("staged: {staged_digest:?}"); -@@ -764,7 +772,7 @@ async fn switch(opts: SwitchOpts) -> Result<()> { - } - let new_spec = RequiredHostSpec::from_spec(&new_spec)?; - -- let fetched = crate::deploy::pull(repo, &target, None, opts.quiet).await?; -+ let fetched = crate::deploy::pull(repo, &target, None, opts.quiet, opts.json).await?; - - if !opts.retain { - // By default, we prune the previous ostree ref so it will go away after later upgrades -@@ -826,7 +834,7 @@ async fn edit(opts: EditOpts) -> Result<()> { - return crate::deploy::rollback(sysroot).await; - } - -- let fetched = crate::deploy::pull(repo, new_spec.image, None, opts.quiet).await?; -+ let fetched = crate::deploy::pull(repo, new_spec.image, None, opts.quiet, false).await?; - - // TODO gc old layers here - -diff --git a/lib/src/deploy.rs b/lib/src/deploy.rs -index 02472944..b561929e 100644 ---- a/lib/src/deploy.rs -+++ b/lib/src/deploy.rs -@@ -45,6 +45,16 @@ pub(crate) struct ImageState { - pub(crate) ostree_commit: String, - } - -+/// Download information -+#[derive(Debug,serde::Serialize)] -+pub struct JsonProgress { -+ pub done_bytes: u64, -+ pub download_bytes: u64, -+ pub image_bytes: u64, -+ pub n_layers: usize, -+ pub n_layers_done: usize, -+} -+ - impl<'a> RequiredHostSpec<'a> { - /// Given a (borrowed) host specification, "unwrap" its internal - /// options, giving a spec that is required to have a base container image. -@@ -233,6 +243,65 @@ async fn handle_layer_progress_print( - } - } - -+/// Write container fetch progress to standard output. -+async fn handle_layer_progress_print_jsonl( -+ mut layers: tokio::sync::mpsc::Receiver, -+ mut layer_bytes: tokio::sync::watch::Receiver>, -+ n_layers_to_fetch: usize, -+ download_bytes: u64, -+ image_bytes: u64, -+) { -+ let mut total_read = 0u64; -+ let mut layers_done: usize = 0; -+ let mut last_json_written = std::time::Instant::now(); -+ loop { -+ tokio::select! { -+ // Always handle layer changes first. -+ biased; -+ layer = layers.recv() => { -+ if let Some(l) = layer { -+ if !l.is_starting() { -+ let layer = descriptor_of_progress(&l); -+ layers_done += 1; -+ total_read += total_read.saturating_add(layer.size()); -+ } -+ } else { -+ // If the receiver is disconnected, then we're done -+ break -+ }; -+ }, -+ r = layer_bytes.changed() => { -+ if r.is_err() { -+ // If the receiver is disconnected, then we're done -+ break -+ } -+ let bytes = layer_bytes.borrow(); -+ if let Some(bytes) = &*bytes { -+ let done_bytes = total_read + bytes.fetched; -+ -+ // Lets update the json output only on bytes fetched -+ // They are common enough, anyhow. Debounce on time. -+ let curr = std::time::Instant::now(); -+ if curr.duration_since(last_json_written).as_secs_f64() > 0.2 { -+ let json = JsonProgress { -+ done_bytes, -+ download_bytes, -+ image_bytes, -+ n_layers: n_layers_to_fetch, -+ n_layers_done: layers_done, -+ }; -+ let json = serde_json::to_string(&json).unwrap(); -+ // Write to stderr so that consumer can filter this -+ eprintln!("{}", json); -+ last_json_written = curr; -+ } -+ } -+ } -+ } -+ } -+} -+ -+ - /// Wrapper for pulling a container image, wiring up status output. - #[context("Pulling")] - pub(crate) async fn pull( -@@ -240,6 +309,7 @@ pub(crate) async fn pull( - imgref: &ImageReference, - target_imgref: Option<&OstreeImageReference>, - quiet: bool, -+ json: bool, - ) -> Result> { - let ostree_imgref = &OstreeImageReference::from(imgref.clone()); - let mut imp = new_importer(repo, ostree_imgref).await?; -@@ -261,14 +331,22 @@ pub(crate) async fn pull( - let layers_to_fetch = prep.layers_to_fetch().collect::>>()?; - let n_layers_to_fetch = layers_to_fetch.len(); - let download_bytes: u64 = layers_to_fetch.iter().map(|(l, _)| l.layer.size()).sum(); -+ let image_bytes: u64 = prep.all_layers().map(|l| l.layer.size()).sum(); - -- let printer = (!quiet).then(|| { -+ let printer = (!quiet || json).then(|| { - let layer_progress = imp.request_progress(); - let layer_byte_progress = imp.request_layer_progress(); -- tokio::task::spawn(async move { -- handle_layer_progress_print(layer_progress, layer_byte_progress, n_layers_to_fetch, download_bytes) -- .await -- }) -+ if json { -+ tokio::task::spawn(async move { -+ handle_layer_progress_print_jsonl(layer_progress, layer_byte_progress, n_layers_to_fetch, download_bytes, image_bytes) -+ .await -+ }) -+ } else { -+ tokio::task::spawn(async move { -+ handle_layer_progress_print(layer_progress, layer_byte_progress, n_layers_to_fetch, download_bytes) -+ .await -+ }) -+ } - }); - let import = imp.import(prep).await; - if let Some(printer) = printer { -diff --git a/lib/src/install.rs b/lib/src/install.rs -index 98857edf..69b5fab5 100644 ---- a/lib/src/install.rs -+++ b/lib/src/install.rs -@@ -744,7 +744,7 @@ async fn install_container( - let spec_imgref = ImageReference::from(src_imageref.clone()); - let repo = &sysroot.repo(); - repo.set_disable_fsync(true); -- let r = crate::deploy::pull(repo, &spec_imgref, Some(&state.target_imgref), false).await?; -+ let r = crate::deploy::pull(repo, &spec_imgref, Some(&state.target_imgref), false, false).await?; - repo.set_disable_fsync(false); - r - }; + let serialized_manifest = serde_json::to_string(&import.manifest)?; + let serialized_config = serde_json::to_string(&import.config)?; + let mut metadata = HashMap::new(); diff --git a/spec_files/bootc/bootc.spec b/spec_files/bootc/bootc.spec index 2132ed1e..7fb7a82c 100644 --- a/spec_files/bootc/bootc.spec +++ b/spec_files/bootc/bootc.spec @@ -1,7 +1,8 @@ %bcond_without check +%bcond_without ostree_ext Name: bootc -Version: 1.1.2 +Version: 1.1.3 Release: 100.bazzite Summary: Bootable container system @@ -34,6 +35,8 @@ BuildRequires: rust-toolset BuildRequires: cargo-rpm-macros >= 25 %endif BuildRequires: systemd +# For tests +BuildRequires: skopeo ostree # Backing storage tooling https://github.com/containers/composefs/issues/125 Requires: composefs @@ -44,6 +47,11 @@ Requires: podman # For bootloader updates Recommends: bootupd +# A made up provides so that rpm-ostree can depend on it +%if %{with ostree_ext} +Provides: ostree-cli(ostree-container) +%endif + %description %{summary} @@ -59,6 +67,9 @@ Recommends: bootupd %install %make_install INSTALL="install -p -c" +%if %{with ostree_ext} +make install-ostree-hooks DESTDIR=%{?buildroot} +%endif %if %{with check} %check @@ -74,6 +85,9 @@ Recommends: bootupd %{_bindir}/bootc %{_prefix}/lib/bootc/ %{_prefix}/lib/systemd/system-generators/* +%if %{with ostree_ext} +%{_prefix}/libexec/libostree/ext/* +%endif %{_unitdir}/* %{_mandir}/man*/bootc*