2024-12-24 19:20:16 -08:00

886 lines
33 KiB
Diff

From 518435fb3eaf78b6e8c8d18e409076d2de3d6546 Mon Sep 17 00:00:00 2001
From: Antheas Kapenekakis <git@antheas.dev>
Date: Thu, 28 Nov 2024 00:12:26 +0100
Subject: [PATCH] pull: Add --json-fd
This adds a generic "progress" infrastructure for granular
incremental notifications of downloading in particular, but
we may extend this to other generic tasks in the future too.
Signed-off-by: Antheas Kapenekakis <git@antheas.dev>
Signed-off-by: Colin Walters <walters@verbum.org>
---
lib/src/cli.rs | 36 +++-
lib/src/deploy.rs | 246 +++++++++++++++++++++++--
lib/src/install.rs | 10 +-
lib/src/lib.rs | 1 +
lib/src/progress_jsonl.rs | 290 ++++++++++++++++++++++++++++++
ostree-ext/src/cli.rs | 1 +
ostree-ext/src/container/store.rs | 8 +-
7 files changed, 568 insertions(+), 24 deletions(-)
create mode 100644 lib/src/progress_jsonl.rs
diff --git a/lib/src/cli.rs b/lib/src/cli.rs
index e293fd940..8e429fe93 100644
--- a/lib/src/cli.rs
+++ b/lib/src/cli.rs
@@ -4,6 +4,7 @@
use std::ffi::{CString, OsStr, OsString};
use std::io::Seek;
+use std::os::fd::RawFd;
use std::os::unix::process::CommandExt;
use std::process::Command;
@@ -25,6 +26,8 @@ use serde::{Deserialize, Serialize};
use crate::deploy::RequiredHostSpec;
use crate::lints;
+use crate::progress_jsonl;
+use crate::progress_jsonl::ProgressWriter;
use crate::spec::Host;
use crate::spec::ImageReference;
use crate::utils::sigpolicy_from_opts;
@@ -52,6 +55,10 @@ pub(crate) struct UpgradeOpts {
/// a userspace-only restart.
#[clap(long, conflicts_with = "check")]
pub(crate) apply: bool,
+
+ /// Pipe download progress to this fd in a jsonl format.
+ #[clap(long)]
+ pub(crate) json_fd: Option<RawFd>,
}
/// Perform an switch operation
@@ -101,6 +108,10 @@ pub(crate) struct SwitchOpts {
/// Target image to use for the next boot.
pub(crate) target: String,
+
+ /// Pipe download progress to this fd in a jsonl format.
+ #[clap(long)]
+ pub(crate) json_fd: Option<RawFd>,
}
/// Options controlling rollback
@@ -644,6 +655,12 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> {
let (booted_deployment, _deployments, host) =
crate::status::get_status_require_booted(sysroot)?;
let imgref = host.spec.image.as_ref();
+ let prog = opts
+ .json_fd
+ .map(progress_jsonl::ProgressWriter::from_raw_fd)
+ .transpose()?
+ .unwrap_or_default();
+
// If there's no specified image, let's be nice and check if the booted system is using rpm-ostree
if imgref.is_none() {
let booted_incompatible = host
@@ -700,7 +717,7 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> {
}
}
} else {
- let fetched = crate::deploy::pull(repo, imgref, None, opts.quiet).await?;
+ let fetched = crate::deploy::pull(repo, imgref, None, opts.quiet, prog.clone()).await?;
let staged_digest = staged_image.map(|s| s.digest().expect("valid digest in status"));
let fetched_digest = &fetched.manifest_digest;
tracing::debug!("staged: {staged_digest:?}");
@@ -723,7 +740,7 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> {
println!("No update available.")
} else {
let osname = booted_deployment.osname();
- crate::deploy::stage(sysroot, &osname, &fetched, &spec).await?;
+ crate::deploy::stage(sysroot, &osname, &fetched, &spec, prog.clone()).await?;
changed = true;
if let Some(prev) = booted_image.as_ref() {
if let Some(fetched_manifest) = fetched.get_manifest(repo)? {
@@ -759,6 +776,11 @@ async fn switch(opts: SwitchOpts) -> Result<()> {
);
let target = ostree_container::OstreeImageReference { sigverify, imgref };
let target = ImageReference::from(target);
+ let prog = opts
+ .json_fd
+ .map(progress_jsonl::ProgressWriter::from_raw_fd)
+ .transpose()?
+ .unwrap_or_default();
// If we're doing an in-place mutation, we shortcut most of the rest of the work here
if opts.mutate_in_place {
@@ -794,7 +816,7 @@ async fn switch(opts: SwitchOpts) -> Result<()> {
}
let new_spec = RequiredHostSpec::from_spec(&new_spec)?;
- let fetched = crate::deploy::pull(repo, &target, None, opts.quiet).await?;
+ let fetched = crate::deploy::pull(repo, &target, None, opts.quiet, prog.clone()).await?;
if !opts.retain {
// By default, we prune the previous ostree ref so it will go away after later upgrades
@@ -808,7 +830,7 @@ async fn switch(opts: SwitchOpts) -> Result<()> {
}
let stateroot = booted_deployment.osname();
- crate::deploy::stage(sysroot, &stateroot, &fetched, &new_spec).await?;
+ crate::deploy::stage(sysroot, &stateroot, &fetched, &new_spec, prog.clone()).await?;
if opts.apply {
crate::reboot::reboot()?;
@@ -850,18 +872,20 @@ async fn edit(opts: EditOpts) -> Result<()> {
host.spec.verify_transition(&new_host.spec)?;
let new_spec = RequiredHostSpec::from_spec(&new_host.spec)?;
+ let prog = ProgressWriter::default();
+
// We only support two state transitions right now; switching the image,
// or flipping the bootloader ordering.
if host.spec.boot_order != new_host.spec.boot_order {
return crate::deploy::rollback(sysroot).await;
}
- let fetched = crate::deploy::pull(repo, new_spec.image, None, opts.quiet).await?;
+ let fetched = crate::deploy::pull(repo, new_spec.image, None, opts.quiet, prog.clone()).await?;
// TODO gc old layers here
let stateroot = booted_deployment.osname();
- crate::deploy::stage(sysroot, &stateroot, &fetched, &new_spec).await?;
+ crate::deploy::stage(sysroot, &stateroot, &fetched, &new_spec, prog.clone()).await?;
Ok(())
}
diff --git a/lib/src/deploy.rs b/lib/src/deploy.rs
index 960c1abde..7196e2881 100644
--- a/lib/src/deploy.rs
+++ b/lib/src/deploy.rs
@@ -21,6 +21,7 @@ use ostree_ext::ostree::{self, Sysroot};
use ostree_ext::sysroot::SysrootLock;
use ostree_ext::tokio_util::spawn_blocking_cancellable_flatten;
+use crate::progress_jsonl::{Event, ProgressWriter, SubTaskBytes, SubTaskStep, API_VERSION};
use crate::spec::ImageReference;
use crate::spec::{BootOrder, HostSpec};
use crate::status::labels_of_config;
@@ -141,11 +142,20 @@ fn prefix_of_progress(p: &ImportProgress) -> &'static str {
async fn handle_layer_progress_print(
mut layers: tokio::sync::mpsc::Receiver<ostree_container::store::ImportProgress>,
mut layer_bytes: tokio::sync::watch::Receiver<Option<ostree_container::store::LayerProgress>>,
+ digest: Box<str>,
n_layers_to_fetch: usize,
+ layers_total: usize,
+ bytes_to_download: u64,
+ bytes_total: u64,
+ prog: ProgressWriter,
+ quiet: bool,
) {
let start = std::time::Instant::now();
let mut total_read = 0u64;
let bar = indicatif::MultiProgress::new();
+ if quiet {
+ bar.set_draw_target(indicatif::ProgressDrawTarget::hidden());
+ }
let layers_bar = bar.add(indicatif::ProgressBar::new(
n_layers_to_fetch.try_into().unwrap(),
));
@@ -157,7 +167,8 @@ async fn handle_layer_progress_print(
.template("{prefix} {bar} {pos}/{len} {wide_msg}")
.unwrap(),
);
- layers_bar.set_prefix("Fetching layers");
+ let taskname = "Fetching layers";
+ layers_bar.set_prefix(taskname);
layers_bar.set_message("");
byte_bar.set_prefix("Fetching");
byte_bar.set_style(
@@ -167,6 +178,9 @@ async fn handle_layer_progress_print(
)
.unwrap()
);
+
+ let mut subtasks = vec![];
+ let mut subtask: SubTaskBytes = Default::default();
loop {
tokio::select! {
// Always handle layer changes first.
@@ -174,18 +188,44 @@ async fn handle_layer_progress_print(
layer = layers.recv() => {
if let Some(l) = layer {
let layer = descriptor_of_progress(&l);
+ let layer_type = prefix_of_progress(&l);
+ let short_digest = &layer.digest().digest()[0..21];
let layer_size = layer.size();
if l.is_starting() {
+ // Reset the progress bar
byte_bar.reset_elapsed();
byte_bar.reset_eta();
byte_bar.set_length(layer_size);
- let layer_type = prefix_of_progress(&l);
- let short_digest = &layer.digest().digest()[0..21];
byte_bar.set_message(format!("{layer_type} {short_digest}"));
+
+ subtask = SubTaskBytes {
+ subtask: layer_type.into(),
+ description: format!("{layer_type}: {short_digest}").clone().into(),
+ id: format!("{short_digest}").clone().into(),
+ bytes_cached: 0,
+ bytes: 0,
+ bytes_total: layer_size,
+ };
} else {
byte_bar.set_position(layer_size);
layers_bar.inc(1);
total_read = total_read.saturating_add(layer_size);
+ // Emit an event where bytes == total to signal completion.
+ subtask.bytes = layer_size;
+ subtasks.push(subtask.clone());
+ prog.send(Event::ProgressBytes {
+ api_version: API_VERSION.into(),
+ task: "pulling".into(),
+ description: format!("Pulling Image: {digest}").into(),
+ id: (*digest).into(),
+ bytes_cached: bytes_total - bytes_to_download,
+ bytes: total_read,
+ bytes_total: bytes_to_download,
+ steps_cached: (layers_total - n_layers_to_fetch) as u64,
+ steps: layers_bar.position(),
+ steps_total: n_layers_to_fetch as u64,
+ subtasks: subtasks.clone(),
+ }).await;
}
} else {
// If the receiver is disconnected, then we're done
@@ -197,9 +237,26 @@ async fn handle_layer_progress_print(
// If the receiver is disconnected, then we're done
break
}
- let bytes = layer_bytes.borrow();
- if let Some(bytes) = &*bytes {
+ let bytes = {
+ let bytes = layer_bytes.borrow_and_update();
+ bytes.as_ref().cloned()
+ };
+ if let Some(bytes) = bytes {
byte_bar.set_position(bytes.fetched);
+ subtask.bytes = byte_bar.position();
+ prog.send_lossy(Event::ProgressBytes {
+ api_version: API_VERSION.into(),
+ task: "pulling".into(),
+ description: format!("Pulling Image: {digest}").into(),
+ id: (*digest).into(),
+ bytes_cached: bytes_total - bytes_to_download,
+ bytes: total_read + byte_bar.position(),
+ bytes_total: bytes_to_download,
+ steps_cached: (layers_total - n_layers_to_fetch) as u64,
+ steps: layers_bar.position(),
+ steps_total: n_layers_to_fetch as u64,
+ subtasks: subtasks.clone().into_iter().chain([subtask.clone()]).collect(),
+ }).await;
}
}
}
@@ -221,6 +278,27 @@ async fn handle_layer_progress_print(
)) {
tracing::warn!("writing to stdout: {e}");
}
+
+ // Since the progress notifier closed, we know import has started
+ // use as a heuristic to begin import progress
+ // Cannot be lossy or it is dropped
+ prog.send(Event::ProgressSteps {
+ api_version: API_VERSION.into(),
+ task: "importing".into(),
+ description: "Importing Image".into(),
+ id: (*digest).into(),
+ steps_cached: 0,
+ steps: 0,
+ steps_total: 1,
+ subtasks: [SubTaskStep {
+ subtask: "importing".into(),
+ description: "Importing Image".into(),
+ id: "importing".into(),
+ completed: false,
+ }]
+ .into(),
+ })
+ .await;
}
/// Wrapper for pulling a container image, wiring up status output.
@@ -230,6 +308,7 @@ pub(crate) async fn pull(
imgref: &ImageReference,
target_imgref: Option<&OstreeImageReference>,
quiet: bool,
+ prog: ProgressWriter,
) -> Result<Box<ImageState>> {
let ostree_imgref = &OstreeImageReference::from(imgref.clone());
let mut imp = new_importer(repo, ostree_imgref).await?;
@@ -250,20 +329,52 @@ pub(crate) async fn pull(
ostree_ext::cli::print_layer_status(&prep);
let layers_to_fetch = prep.layers_to_fetch().collect::<Result<Vec<_>>>()?;
let n_layers_to_fetch = layers_to_fetch.len();
- let printer = (!quiet).then(|| {
- let layer_progress = imp.request_progress();
- let layer_byte_progress = imp.request_layer_progress();
- tokio::task::spawn(async move {
- handle_layer_progress_print(layer_progress, layer_byte_progress, n_layers_to_fetch)
- .await
- })
+ let layers_total = prep.all_layers().count();
+ let bytes_to_fetch: u64 = layers_to_fetch.iter().map(|(l, _)| l.layer.size()).sum();
+ let bytes_total: u64 = prep.all_layers().map(|l| l.layer.size()).sum();
+
+ let prog_print = prog.clone();
+ let digest = prep.manifest_digest.clone();
+ let digest_imp = prep.manifest_digest.clone();
+ let layer_progress = imp.request_progress();
+ let layer_byte_progress = imp.request_layer_progress();
+ let printer = tokio::task::spawn(async move {
+ handle_layer_progress_print(
+ layer_progress,
+ layer_byte_progress,
+ digest.as_ref().into(),
+ n_layers_to_fetch,
+ layers_total,
+ bytes_to_fetch,
+ bytes_total,
+ prog_print,
+ quiet,
+ )
+ .await
});
let import = imp.import(prep).await;
- if let Some(printer) = printer {
- let _ = printer.await;
- }
+ let _ = printer.await;
+ // Both the progress and the import are done, so import is done as well
+ prog.send(Event::ProgressSteps {
+ api_version: API_VERSION.into(),
+ task: "importing".into(),
+ description: "Importing Image".into(),
+ id: digest_imp.clone().as_ref().into(),
+ steps_cached: 0,
+ steps: 1,
+ steps_total: 1,
+ subtasks: [SubTaskStep {
+ subtask: "importing".into(),
+ description: "Importing Image".into(),
+ id: "importing".into(),
+ completed: true,
+ }]
+ .into(),
+ })
+ .await;
let import = import?;
let wrote_imgref = target_imgref.as_ref().unwrap_or(&ostree_imgref);
+
if let Some(msg) =
ostree_container::store::image_filtered_content_warning(repo, &wrote_imgref.imgref)
.context("Image content warning")?
@@ -450,8 +561,53 @@ pub(crate) async fn stage(
stateroot: &str,
image: &ImageState,
spec: &RequiredHostSpec<'_>,
+ prog: ProgressWriter,
) -> Result<()> {
+ let mut subtask = SubTaskStep {
+ subtask: "merging".into(),
+ description: "Merging Image".into(),
+ id: "fetching".into(),
+ completed: false,
+ };
+ let mut subtasks = vec![];
+ prog.send(Event::ProgressSteps {
+ api_version: API_VERSION.into(),
+ task: "staging".into(),
+ description: "Deploying Image".into(),
+ id: image.manifest_digest.clone().as_ref().into(),
+ steps_cached: 0,
+ steps: 0,
+ steps_total: 3,
+ subtasks: subtasks
+ .clone()
+ .into_iter()
+ .chain([subtask.clone()])
+ .collect(),
+ })
+ .await;
let merge_deployment = sysroot.merge_deployment(Some(stateroot));
+
+ subtask.completed = true;
+ subtasks.push(subtask.clone());
+ subtask.subtask = "deploying".into();
+ subtask.id = "deploying".into();
+ subtask.description = "Deploying Image".into();
+ subtask.completed = false;
+ prog.send(Event::ProgressSteps {
+ api_version: API_VERSION.into(),
+ task: "staging".into(),
+ description: "Deploying Image".into(),
+ id: image.manifest_digest.clone().as_ref().into(),
+ steps_cached: 0,
+ steps: 1,
+ steps_total: 3,
+ subtasks: subtasks
+ .clone()
+ .into_iter()
+ .chain([subtask.clone()])
+ .collect(),
+ })
+ .await;
let origin = origin_from_imageref(spec.image)?;
let deployment = crate::deploy::deploy(
sysroot,
@@ -462,8 +618,50 @@ pub(crate) async fn stage(
)
.await?;
+ subtask.completed = true;
+ subtasks.push(subtask.clone());
+ subtask.subtask = "bound_images".into();
+ subtask.id = "bound_images".into();
+ subtask.description = "Pulling Bound Images".into();
+ subtask.completed = false;
+ prog.send(Event::ProgressSteps {
+ api_version: API_VERSION.into(),
+ task: "staging".into(),
+ description: "Deploying Image".into(),
+ id: image.manifest_digest.clone().as_ref().into(),
+ steps_cached: 0,
+ steps: 1,
+ steps_total: 3,
+ subtasks: subtasks
+ .clone()
+ .into_iter()
+ .chain([subtask.clone()])
+ .collect(),
+ })
+ .await;
crate::boundimage::pull_bound_images(sysroot, &deployment).await?;
+ subtask.completed = true;
+ subtasks.push(subtask.clone());
+ subtask.subtask = "cleanup".into();
+ subtask.id = "cleanup".into();
+ subtask.description = "Removing old images".into();
+ subtask.completed = false;
+ prog.send(Event::ProgressSteps {
+ api_version: API_VERSION.into(),
+ task: "staging".into(),
+ description: "Deploying Image".into(),
+ id: image.manifest_digest.clone().as_ref().into(),
+ steps_cached: 0,
+ steps: 2,
+ steps_total: 3,
+ subtasks: subtasks
+ .clone()
+ .into_iter()
+ .chain([subtask.clone()])
+ .collect(),
+ })
+ .await;
crate::deploy::cleanup(sysroot).await?;
println!("Queued for next boot: {:#}", spec.image);
if let Some(version) = image.version.as_deref() {
@@ -471,6 +669,24 @@ pub(crate) async fn stage(
}
println!(" Digest: {}", image.manifest_digest);
+ subtask.completed = true;
+ subtasks.push(subtask.clone());
+ prog.send(Event::ProgressSteps {
+ api_version: API_VERSION.into(),
+ task: "staging".into(),
+ description: "Deploying Image".into(),
+ id: image.manifest_digest.clone().as_ref().into(),
+ steps_cached: 0,
+ steps: 3,
+ steps_total: 3,
+ subtasks: subtasks
+ .clone()
+ .into_iter()
+ .chain([subtask.clone()])
+ .collect(),
+ })
+ .await;
+
Ok(())
}
diff --git a/lib/src/install.rs b/lib/src/install.rs
index 548c3e8d3..2834ae44f 100644
--- a/lib/src/install.rs
+++ b/lib/src/install.rs
@@ -48,6 +48,7 @@ use crate::boundimage::{BoundImage, ResolvedBoundImage};
use crate::containerenv::ContainerExecutionInfo;
use crate::lsm;
use crate::mount::Filesystem;
+use crate::progress_jsonl::ProgressWriter;
use crate::spec::ImageReference;
use crate::store::Storage;
use crate::task::Task;
@@ -733,7 +734,14 @@ async fn install_container(
let spec_imgref = ImageReference::from(src_imageref.clone());
let repo = &sysroot.repo();
repo.set_disable_fsync(true);
- let r = crate::deploy::pull(repo, &spec_imgref, Some(&state.target_imgref), false).await?;
+ let r = crate::deploy::pull(
+ repo,
+ &spec_imgref,
+ Some(&state.target_imgref),
+ false,
+ ProgressWriter::default(),
+ )
+ .await?;
repo.set_disable_fsync(false);
r
};
diff --git a/lib/src/lib.rs b/lib/src/lib.rs
index 1f0c263b5..f90ef3665 100644
--- a/lib/src/lib.rs
+++ b/lib/src/lib.rs
@@ -41,3 +41,4 @@ pub mod spec;
mod docgen;
mod glyph;
mod imgstorage;
+mod progress_jsonl;
diff --git a/lib/src/progress_jsonl.rs b/lib/src/progress_jsonl.rs
new file mode 100644
index 000000000..8e97a7631
--- /dev/null
+++ b/lib/src/progress_jsonl.rs
@@ -0,0 +1,290 @@
+//! Output progress data using the json-lines format. For more information
+//! see <https://jsonlines.org/>.
+
+use anyhow::Result;
+use fn_error_context::context;
+use serde::Serialize;
+use std::borrow::Cow;
+use std::os::fd::{FromRawFd, OwnedFd, RawFd};
+use std::sync::Arc;
+use std::time::Instant;
+use tokio::io::{AsyncWriteExt, BufWriter};
+use tokio::net::unix::pipe::Sender;
+use tokio::sync::Mutex;
+
+// Maximum number of times per second that an event will be written.
+const REFRESH_HZ: u16 = 5;
+
+pub const API_VERSION: &str = "org.containers.bootc.progress/v1";
+
+/// An incremental update to e.g. a container image layer download.
+/// The first time a given "subtask" name is seen, a new progress bar should be created.
+/// If bytes == bytes_total, then the subtask is considered complete.
+#[derive(Debug, serde::Serialize, serde::Deserialize, Default, Clone)]
+#[serde(rename_all = "camelCase")]
+pub struct SubTaskBytes<'t> {
+ /// A machine readable type for the task (used for i18n).
+ /// (e.g., "ostree_chunk", "ostree_derived")
+ #[serde(borrow)]
+ pub subtask: Cow<'t, str>,
+ /// A human readable description of the task if i18n is not available.
+ /// (e.g., "OSTree Chunk:", "Derived Layer:")
+ #[serde(borrow)]
+ pub description: Cow<'t, str>,
+ /// A human and machine readable identifier for the task
+ /// (e.g., ostree chunk/layer hash).
+ #[serde(borrow)]
+ pub id: Cow<'t, str>,
+ /// The number of bytes fetched by a previous run (e.g., zstd_chunked).
+ pub bytes_cached: u64,
+ /// Updated byte level progress
+ pub bytes: u64,
+ /// Total number of bytes
+ pub bytes_total: u64,
+}
+
+/// Marks the beginning and end of a dictrete step
+#[derive(Debug, serde::Serialize, serde::Deserialize, Default, Clone)]
+#[serde(rename_all = "camelCase")]
+pub struct SubTaskStep<'t> {
+ /// A machine readable type for the task (used for i18n).
+ /// (e.g., "ostree_chunk", "ostree_derived")
+ #[serde(borrow)]
+ pub subtask: Cow<'t, str>,
+ /// A human readable description of the task if i18n is not available.
+ /// (e.g., "OSTree Chunk:", "Derived Layer:")
+ #[serde(borrow)]
+ pub description: Cow<'t, str>,
+ /// A human and machine readable identifier for the task
+ /// (e.g., ostree chunk/layer hash).
+ #[serde(borrow)]
+ pub id: Cow<'t, str>,
+ /// Starts as false when beginning to execute and turns true when completed.
+ pub completed: bool,
+}
+
+/// An event emitted as JSON.
+#[derive(Debug, serde::Serialize, serde::Deserialize)]
+#[serde(
+ tag = "type",
+ rename_all = "PascalCase",
+ rename_all_fields = "camelCase"
+)]
+pub enum Event<'t> {
+ /// An incremental update to a container image layer download
+ ProgressBytes {
+ /// The version of the progress event format.
+ #[serde(borrow)]
+ api_version: Cow<'t, str>,
+ /// A machine readable type (e.g., pulling) for the task (used for i18n
+ /// and UI customization).
+ #[serde(borrow)]
+ task: Cow<'t, str>,
+ /// A human readable description of the task if i18n is not available.
+ #[serde(borrow)]
+ description: Cow<'t, str>,
+ /// A human and machine readable unique identifier for the task
+ /// (e.g., the image name). For tasks that only happen once,
+ /// it can be set to the same value as task.
+ #[serde(borrow)]
+ id: Cow<'t, str>,
+ /// The number of bytes fetched by a previous run.
+ bytes_cached: u64,
+ /// The number of bytes already fetched.
+ bytes: u64,
+ /// Total number of bytes. If zero, then this should be considered "unspecified".
+ bytes_total: u64,
+ /// The number of steps fetched by a previous run.
+ steps_cached: u64,
+ /// The initial position of progress.
+ steps: u64,
+ /// The total number of steps (e.g. container image layers, RPMs)
+ steps_total: u64,
+ /// The currently running subtasks.
+ subtasks: Vec<SubTaskBytes<'t>>,
+ },
+ /// An incremental update with discrete steps
+ ProgressSteps {
+ /// The version of the progress event format.
+ #[serde(borrow)]
+ api_version: Cow<'t, str>,
+ /// A machine readable type (e.g., pulling) for the task (used for i18n
+ /// and UI customization).
+ #[serde(borrow)]
+ task: Cow<'t, str>,
+ /// A human readable description of the task if i18n is not available.
+ #[serde(borrow)]
+ description: Cow<'t, str>,
+ /// A human and machine readable unique identifier for the task
+ /// (e.g., the image name). For tasks that only happen once,
+ /// it can be set to the same value as task.
+ #[serde(borrow)]
+ id: Cow<'t, str>,
+ /// The number of steps fetched by a previous run.
+ steps_cached: u64,
+ /// The initial position of progress.
+ steps: u64,
+ /// The total number of steps (e.g. container image layers, RPMs)
+ steps_total: u64,
+ /// The currently running subtasks.
+ subtasks: Vec<SubTaskStep<'t>>,
+ },
+}
+
+#[derive(Debug)]
+struct ProgressWriterInner {
+ last_write: Option<std::time::Instant>,
+ fd: BufWriter<Sender>,
+}
+
+#[derive(Clone, Debug, Default)]
+pub(crate) struct ProgressWriter {
+ inner: Arc<Mutex<Option<ProgressWriterInner>>>,
+}
+
+impl TryFrom<OwnedFd> for ProgressWriter {
+ type Error = anyhow::Error;
+
+ fn try_from(value: OwnedFd) -> Result<Self> {
+ let value = Sender::from_owned_fd(value)?;
+ Ok(Self::from(value))
+ }
+}
+
+impl From<Sender> for ProgressWriter {
+ fn from(value: Sender) -> Self {
+ let inner = ProgressWriterInner {
+ last_write: None,
+ fd: BufWriter::new(value),
+ };
+ Self {
+ inner: Arc::new(Some(inner).into()),
+ }
+ }
+}
+
+impl ProgressWriter {
+ /// Given a raw file descriptor, create an instance of a json-lines writer.
+ #[allow(unsafe_code)]
+ #[context("Creating progress writer")]
+ pub(crate) fn from_raw_fd(fd: RawFd) -> Result<Self> {
+ unsafe { OwnedFd::from_raw_fd(fd) }.try_into()
+ }
+
+ /// Serialize the target object to JSON as a single line
+ pub(crate) async fn send_impl<T: Serialize>(&self, v: T, required: bool) -> Result<()> {
+ let mut guard = self.inner.lock().await;
+ // Check if we have an inner value; if not, nothing to do.
+ let Some(inner) = guard.as_mut() else {
+ return Ok(());
+ };
+
+ // For messages that can be dropped, if we already sent an update within this cycle, discard this one.
+ // TODO: Also consider querying the pipe buffer and also dropping if we can't do this write.
+ let now = Instant::now();
+ if !required {
+ const REFRESH_MS: u32 = 1000 / REFRESH_HZ as u32;
+ if let Some(elapsed) = inner.last_write.map(|w| now.duration_since(w)) {
+ if elapsed.as_millis() < REFRESH_MS.into() {
+ return Ok(());
+ }
+ }
+ }
+
+ // SAFETY: Propagating panics from the mutex here is intentional
+ // serde is guaranteed not to output newlines here
+ let buf = serde_json::to_vec(&v)?;
+ inner.fd.write_all(&buf).await?;
+ // We always end in a newline
+ inner.fd.write_all(b"\n").await?;
+ // And flush to ensure the remote side sees updates immediately
+ inner.fd.flush().await?;
+ // Update the last write time
+ inner.last_write = Some(now);
+ Ok(())
+ }
+
+ /// Send an event.
+ pub(crate) async fn send<T: Serialize>(&self, v: T) {
+ if let Err(e) = self.send_impl(v, true).await {
+ eprintln!("Failed to write to jsonl: {}", e);
+ // Stop writing to fd but let process continue
+ // SAFETY: Propagating panics from the mutex here is intentional
+ let _ = self.inner.lock().await.take();
+ }
+ }
+
+ /// Send an event that can be dropped.
+ pub(crate) async fn send_lossy<T: Serialize>(&self, v: T) {
+ if let Err(e) = self.send_impl(v, false).await {
+ eprintln!("Failed to write to jsonl: {}", e);
+ // Stop writing to fd but let process continue
+ // SAFETY: Propagating panics from the mutex here is intentional
+ let _ = self.inner.lock().await.take();
+ }
+ }
+
+ /// Flush remaining data and return the underlying file.
+ #[allow(dead_code)]
+ pub(crate) async fn into_inner(self) -> Result<Option<Sender>> {
+ // SAFETY: Propagating panics from the mutex here is intentional
+ let mut mutex = self.inner.lock().await;
+ if let Some(inner) = mutex.take() {
+ Ok(Some(inner.fd.into_inner()))
+ } else {
+ Ok(None)
+ }
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use serde::Deserialize;
+ use tokio::io::{AsyncBufReadExt, BufReader};
+
+ use super::*;
+
+ #[derive(Serialize, Deserialize, PartialEq, Eq, Debug)]
+ struct S {
+ s: String,
+ v: u32,
+ }
+
+ #[tokio::test]
+ async fn test_jsonl() -> Result<()> {
+ let testvalues = [
+ S {
+ s: "foo".into(),
+ v: 42,
+ },
+ S {
+ // Test with an embedded newline to sanity check that serde doesn't write it literally
+ s: "foo\nbar".into(),
+ v: 0,
+ },
+ ];
+ let (send, recv) = tokio::net::unix::pipe::pipe()?;
+ let testvalues_sender = &testvalues;
+ let sender = async move {
+ let w = ProgressWriter::try_from(send)?;
+ for value in testvalues_sender {
+ w.send(value).await;
+ }
+ anyhow::Ok(())
+ };
+ let testvalues = &testvalues;
+ let receiver = async move {
+ let tf = BufReader::new(recv);
+ let mut expected = testvalues.iter();
+ let mut lines = tf.lines();
+ while let Some(line) = lines.next_line().await? {
+ let found: S = serde_json::from_str(&line)?;
+ let expected = expected.next().unwrap();
+ assert_eq!(&found, expected);
+ }
+ anyhow::Ok(())
+ };
+ tokio::try_join!(sender, receiver)?;
+ Ok(())
+ }
+}
diff --git a/ostree-ext/src/cli.rs b/ostree-ext/src/cli.rs
index fa60ccf41..e63e4240e 100644
--- a/ostree-ext/src/cli.rs
+++ b/ostree-ext/src/cli.rs
@@ -657,6 +657,7 @@ pub async fn handle_layer_progress_print(
pub fn print_layer_status(prep: &PreparedImport) {
if let Some(status) = prep.format_layer_status() {
println!("{status}");
+ let _ = std::io::stdout().flush();
}
}
diff --git a/ostree-ext/src/container/store.rs b/ostree-ext/src/container/store.rs
index d4e4a4fec..b2a57a86a 100644
--- a/ostree-ext/src/container/store.rs
+++ b/ostree-ext/src/container/store.rs
@@ -102,7 +102,7 @@ impl ImportProgress {
}
/// Sent across a channel to track the byte-level progress of a layer fetch.
-#[derive(Debug)]
+#[derive(Clone, Debug)]
pub struct LayerProgress {
/// Index of the layer in the manifest
pub layer_index: usize,
@@ -193,7 +193,7 @@ pub enum PrepareResult {
#[derive(Debug)]
pub struct ManifestLayerState {
/// The underlying layer descriptor.
- pub(crate) layer: oci_image::Descriptor,
+ pub layer: oci_image::Descriptor,
// TODO semver: Make this readonly via an accessor
/// The ostree ref name for this layer.
pub ostree_ref: String,
@@ -952,6 +952,10 @@ impl ImageImporter {
proxy.finalize().await?;
tracing::debug!("finalized proxy");
+ // Disconnect progress notifiers to signal we're done with fetching.
+ let _ = self.layer_byte_progress.take();
+ let _ = self.layer_progress.take();
+
let serialized_manifest = serde_json::to_string(&import.manifest)?;
let serialized_config = serde_json::to_string(&import.config)?;
let mut metadata = HashMap::new();