From 5bec48e1d53009912ff4ee66358d6cd4e174d452 Mon Sep 17 00:00:00 2001 From: David Gilligan-Cook Date: Mon, 24 Jul 2023 11:13:53 -0700 Subject: [PATCH] Adds external key-value string data storage to runtime via Annoations objects in Layers. Adds --annotation argument to spfs run/shell, --get and --get-all arguments to spfs info, spfs runtime info, and adds support for reading the value from an Annotation using spfs read. Signed-off-by: David Gilligan-Cook --- Cargo.lock | 4 + crates/spfs-cli/cmd-enter/src/cmd_enter.rs | 1 + crates/spfs-cli/cmd-render/src/cmd_render.rs | 6 +- crates/spfs-cli/common/Cargo.toml | 1 + crates/spfs-cli/common/src/args.rs | 59 +- crates/spfs-cli/common/src/lib.rs | 2 +- crates/spfs-cli/main/Cargo.toml | 5 + crates/spfs-cli/main/src/cmd_info.rs | 30 +- crates/spfs-cli/main/src/cmd_run.rs | 120 ++++- crates/spfs-cli/main/src/cmd_run_test.rs | 198 +++++++ crates/spfs-cli/main/src/cmd_runtime_info.rs | 10 + crates/spfs-cli/main/src/cmd_shell.rs | 5 + crates/spfs-cli/main/src/fixtures.rs | 10 + crates/spfs-proto/schema/spfs.fbs | 24 +- crates/spfs-proto/src/spfs_generated.rs | 503 +++++++++++++++++- crates/spfs/src/check.rs | 83 ++- crates/spfs/src/config.rs | 15 + crates/spfs/src/find_path.rs | 21 +- crates/spfs/src/graph/annotation.rs | 191 +++++++ crates/spfs/src/graph/annotation_test.rs | 28 + crates/spfs/src/graph/layer.rs | 148 +++++- crates/spfs/src/graph/layer_test.rs | 64 ++- crates/spfs/src/graph/mod.rs | 8 +- crates/spfs/src/graph/object.rs | 12 +- crates/spfs/src/proto/conversions.rs | 65 ++- crates/spfs/src/proto/defs/types.proto | 9 + crates/spfs/src/resolve.rs | 44 +- crates/spfs/src/runtime/mod.rs | 1 + crates/spfs/src/runtime/storage.rs | 222 +++++++- crates/spfs/src/runtime/storage_test.rs | 287 +++++++++- crates/spfs/src/storage/fs/renderer_unix.rs | 28 +- crates/spfs/src/storage/fs/renderer_win.rs | 28 +- crates/spfs/src/sync.rs | 92 +++- crates/spk-build/src/build/binary.rs | 10 +- crates/spk-build/src/build/binary_test.rs | 15 +- .../cmd-build/src/cmd_build_test/mod.rs | 12 +- crates/spk-cli/cmd-du/src/cmd_du.rs | 8 +- crates/spk-exec/src/exec.rs | 6 +- crates/spk-storage/src/fixtures.rs | 2 +- crates/spk-storage/src/storage/runtime.rs | 21 +- 40 files changed, 2288 insertions(+), 110 deletions(-) create mode 100644 crates/spfs-cli/main/src/cmd_run_test.rs create mode 100644 crates/spfs-cli/main/src/fixtures.rs create mode 100644 crates/spfs/src/graph/annotation.rs create mode 100644 crates/spfs/src/graph/annotation_test.rs diff --git a/Cargo.lock b/Cargo.lock index 3a132c3a85..61befa68a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3541,6 +3541,7 @@ dependencies = [ "sentry-miette", "sentry-tracing", "serde_json", + "serde_yaml 0.9.27", "spfs", "strip-ansi-escapes 0.2.0", "syslog-tracing", @@ -3612,10 +3613,13 @@ dependencies = [ "number_prefix", "procfs", "relative-path", + "rstest 0.18.2", "serde_json", + "serde_yaml 0.9.27", "spfs", "spfs-cli-common", "strum", + "tempfile", "tokio", "tokio-stream", "tonic", diff --git a/crates/spfs-cli/cmd-enter/src/cmd_enter.rs b/crates/spfs-cli/cmd-enter/src/cmd_enter.rs index 6d272ce6d6..bdd5493d66 100644 --- a/crates/spfs-cli/cmd-enter/src/cmd_enter.rs +++ b/crates/spfs-cli/cmd-enter/src/cmd_enter.rs @@ -175,6 +175,7 @@ impl CmdEnter { // For example, the monitor may need to run fusermount to clean up // fuse filesystems within the runtime before shutting down. tracing::debug!("initializing runtime {owned:#?}"); + let start_time = Instant::now(); let render_summary = spfs::initialize_runtime(&mut owned).await?; self.report_render_summary(render_summary, start_time.elapsed().as_secs_f64()); diff --git a/crates/spfs-cli/cmd-render/src/cmd_render.rs b/crates/spfs-cli/cmd-render/src/cmd_render.rs index 51e97c69c5..510f09e1ae 100644 --- a/crates/spfs-cli/cmd-render/src/cmd_render.rs +++ b/crates/spfs-cli/cmd-render/src/cmd_render.rs @@ -156,7 +156,11 @@ impl CmdRender { &render_summary_reporter as &dyn spfs::storage::fs::RenderReporter, ]), ); - let stack = layers.into_iter().map(|l| *l.manifest()).collect(); + let stack = layers + .into_iter() + .filter_map(|l| l.manifest().copied()) + .collect(); + tracing::trace!("stack: {:?}", stack); renderer .render(&stack, self.strategy) .await diff --git a/crates/spfs-cli/common/Cargo.toml b/crates/spfs-cli/common/Cargo.toml index 4f73499753..1b90c657c8 100644 --- a/crates/spfs-cli/common/Cargo.toml +++ b/crates/spfs-cli/common/Cargo.toml @@ -27,6 +27,7 @@ sentry = { workspace = true, optional = true } sentry-miette = { workspace = true, optional = true } sentry-tracing = { workspace = true, optional = true } serde_json = { version = "1.0.57", optional = true } +serde_yaml = { workspace = true } spfs = { workspace = true } strip-ansi-escapes = { workspace = true, optional = true } syslog-tracing = "0.2.0" diff --git a/crates/spfs-cli/common/src/args.rs b/crates/spfs-cli/common/src/args.rs index e16d380fbc..0a85bfead0 100644 --- a/crates/spfs-cli/common/src/args.rs +++ b/crates/spfs-cli/common/src/args.rs @@ -7,9 +7,10 @@ use std::panic::catch_unwind; #[cfg(feature = "sentry")] use std::sync::Mutex; -use miette::Error; +use miette::{Error, IntoDiagnostic, Result, WrapErr}; #[cfg(feature = "sentry")] use once_cell::sync::OnceCell; +use spfs::io::Pluralize; use spfs::storage::LocalRepository; use tracing_subscriber::prelude::*; @@ -415,6 +416,62 @@ impl Logging { } } +/// Command line flags for viewing annotations in a runtime +#[derive(Debug, Clone, clap::Args)] +pub struct AnnotationViewing { + /// Output the data value for the given annotation key(s) from + /// the active runtime. Each value is printed on its own line + /// without its key. + #[clap(long, alias = "annotation")] + pub get: Option>, + + /// Output all the annotation keys and values from the active + /// runtime as a yaml dictionary + #[clap(long, alias = "all-annotations")] + pub get_all: bool, +} + +impl AnnotationViewing { + /// Display annotation values based on the command line arguments + pub async fn print_data(&self, runtime: &spfs::runtime::Runtime) -> Result<()> { + if self.get_all { + let data = runtime.all_annotations().await?; + let keys = data + .keys() + .map(ToString::to_string) + .collect::>(); + let num_keys = keys.len(); + tracing::debug!( + "{num_keys} annotation {}: {}", + "key".pluralize(num_keys), + keys.join(", ") + ); + println!( + "{}", + serde_yaml::to_string(&data) + .into_diagnostic() + .wrap_err("Failed to generate yaml output")? + ); + } else if let Some(keys) = &self.get { + tracing::debug!("--get these keys: {}", keys.join(", ")); + for key in keys.iter() { + match runtime.annotation(key).await? { + Some(value) => { + tracing::debug!("{key} = {value}"); + println!("{value}"); + } + None => { + tracing::warn!("No annotation stored under: {key}"); + println!(); + } + } + } + } + + Ok(()) + } +} + /// Trait all spfs cli command parsers must implement to provide the /// name of the spfs command that has been parsed. This method will be /// called when configuring sentry. diff --git a/crates/spfs-cli/common/src/lib.rs b/crates/spfs-cli/common/src/lib.rs index 3027811d04..2bcab07dca 100644 --- a/crates/spfs-cli/common/src/lib.rs +++ b/crates/spfs-cli/common/src/lib.rs @@ -11,6 +11,6 @@ pub mod __private { pub use {libc, spfs}; } -pub use args::{capture_if_relevant, CommandName, Logging, Render, Sync}; +pub use args::{capture_if_relevant, AnnotationViewing, CommandName, Logging, Render, Sync}; #[cfg(feature = "sentry")] pub use args::{configure_sentry, shutdown_sentry}; diff --git a/crates/spfs-cli/main/Cargo.toml b/crates/spfs-cli/main/Cargo.toml index aefa88de24..0727733454 100644 --- a/crates/spfs-cli/main/Cargo.toml +++ b/crates/spfs-cli/main/Cargo.toml @@ -32,6 +32,7 @@ nix = { workspace = true, features = ["signal"] } number_prefix = "*" # we hope to match versions with indicatif relative-path = { workspace = true } serde_json = { workspace = true } +serde_yaml = { workspace = true } spfs = { workspace = true } spfs-cli-common = { workspace = true } strum = { workspace = true, features = ["derive"] } @@ -52,3 +53,7 @@ features = [ "Win32_System_SystemInformation", "Win32_System_Threading", ] + +[dev-dependencies] +rstest = { workspace = true } +tempfile = { workspace = true } diff --git a/crates/spfs-cli/main/src/cmd_info.rs b/crates/spfs-cli/main/src/cmd_info.rs index 33b4a9a67d..b3a7348c75 100644 --- a/crates/spfs-cli/main/src/cmd_info.rs +++ b/crates/spfs-cli/main/src/cmd_info.rs @@ -9,6 +9,7 @@ use colored::*; use miette::Result; use spfs::env::SPFS_DIR; use spfs::find_path::ObjectPathEntry; +use spfs::graph::Annotation; use spfs::io::{self, DigestFormat, Pluralize}; use spfs::prelude::*; use spfs::{self}; @@ -20,6 +21,9 @@ pub struct CmdInfo { #[clap(flatten)] logging: cli::Logging, + #[clap(flatten)] + annotation: cli::AnnotationViewing, + /// Lists file sizes in human readable format #[clap(long, short = 'H')] human_readable: bool, @@ -139,10 +143,28 @@ impl CmdInfo { println!( " {} {}", "manifest:".bright_blue(), - self.format_digest(*obj.manifest(), repo).await? + match obj.manifest() { + None => "None".to_string(), + Some(manifest_digest) => self.format_digest(*manifest_digest, repo).await?, + } ); + + let annotations = obj.annotations(); + if annotations.is_empty() { + println!(" {} none", "annotations:".bright_blue()); + } else { + for data in annotations { + let annotation: Annotation = data.into(); + println!(" {}", "annotations:".bright_blue()); + println!(" {} {}", "key:".bright_blue(), annotation.key()); + println!(" {} {}", "value:".bright_blue(), annotation.value()); + } + } + if self.follow { - self.to_process.push_back(obj.manifest().to_string()); + if let Some(manifest_digest) = obj.manifest() { + self.to_process.push_back(manifest_digest.to_string()); + } } } @@ -199,6 +221,10 @@ impl CmdInfo { async fn print_global_info(&self, repo: &spfs::storage::RepositoryHandle) -> Result<()> { let runtime = spfs::active_runtime().await?; + if self.annotation.get_all || self.annotation.get.is_some() { + return self.annotation.print_data(&runtime).await; + } + println!("{}:", "Active Runtime".green()); println!(" {}: {}", "id".bright_blue(), runtime.name()); println!(" {}: {}", "editable".bright_blue(), runtime.status.editable); diff --git a/crates/spfs-cli/main/src/cmd_run.rs b/crates/spfs-cli/main/src/cmd_run.rs index b33e4adf6c..201338d4bc 100644 --- a/crates/spfs-cli/main/src/cmd_run.rs +++ b/crates/spfs-cli/main/src/cmd_run.rs @@ -2,15 +2,110 @@ // SPDX-License-Identifier: Apache-2.0 // https://github.com/imageworks/spk +use std::collections::BTreeMap; use std::ffi::OsString; +use std::io; use std::time::Instant; use clap::{ArgGroup, Args}; -use miette::{Context, Result}; +use miette::{miette, Context, IntoDiagnostic, Result}; +use spfs::graph::object::EncodingFormat; use spfs::prelude::*; +use spfs::runtime::KeyValuePair; +use spfs::storage::FromConfig; use spfs::tracking::EnvSpec; use spfs_cli_common as cli; +#[cfg(test)] +#[path = "./cmd_run_test.rs"] +mod cmd_run_test; +#[cfg(test)] +#[path = "./fixtures.rs"] +mod fixtures; + +#[derive(Args, Clone, Debug)] +pub struct Annotation { + /// Adds annotation key-value string data to the new runtime. + /// + /// This allows external processes to store arbitrary data in the + /// runtimes they create. This is most useful with durable runtimes. + /// The data can be retrieved by running `spfs runtime info` or + /// `spfs info` and using the `--get ` or `--get-all` options + /// + /// Annotation data is specified as key-value string pairs + /// separated by either an equals sign or colon (--annotation + /// name=value --annotation other:value). Multiple pairs of + /// annotation data can also be specified at once in yaml or json + /// format (--annotation '{name: value, other: value}'). + /// + /// Annotation data can also be given in a json or yaml file, by + /// using the `--annotation-file ` argument. If given, + /// `--annotation` arguments will supersede anything given in + /// annotation files. + /// + /// If the same key is used more than once, the last key-value pair + /// will override the earlier values for the same key. + #[clap(long, value_name = "KEY:VALUE")] + pub annotation: Vec, + + /// Specify annotation key-value data from a json or yaml file + /// (see --annotation) + #[clap(long)] + pub annotation_file: Vec, +} + +impl Annotation { + /// Returns a list of annotation key-value pairs gathered from all + /// the annotation related command line arguments. The same keys, + /// and values, can appear multiple times in the list if specified + /// multiple times in various command line arguments. + pub fn get_data(&self) -> Result> { + let mut data: Vec = Vec::new(); + + for filename in self.annotation_file.iter() { + let reader: Box = + if Ok("-".to_string()) == filename.clone().into_os_string().into_string() { + // Treat '-' as "read from stdin" + Box::new(io::stdin()) + } else { + Box::new( + std::fs::File::open(filename) + .into_diagnostic() + .wrap_err(format!("Failed to open annotation file: {filename:?}"))?, + ) + }; + let annotation: BTreeMap = serde_yaml::from_reader(reader) + .into_diagnostic() + .wrap_err(format!( + "Failed to parse as annotation data key-value pairs: {filename:?}" + ))?; + data.extend(annotation); + } + + for pair in self.annotation.iter() { + let pair = pair.trim(); + if pair.starts_with('{') { + let given: BTreeMap = serde_yaml::from_str(pair) + .into_diagnostic() + .wrap_err("--annotation value looked like yaml, but could not be parsed")?; + data.extend(given); + continue; + } + + let (name, value) = pair + .split_once('=') + .or_else(|| pair.split_once(':')) + .ok_or_else(|| { + miette!("Invalid option: -annotation {pair} (should be in the form name=value)") + })?; + + data.push((name.to_string(), value.to_string())); + } + + Ok(data) + } +} + /// Run a program in a configured spfs environment #[derive(Debug, Args)] #[clap(group( @@ -51,6 +146,9 @@ pub struct CmdRun { #[clap(long, value_name = "RUNTIME_NAME")] pub rerun: Option, + #[clap(flatten)] + pub annotation: Annotation, + /// The tag or id of the desired runtime /// /// Use '-' to request an empty environment @@ -146,6 +244,26 @@ impl CmdRun { ); } + let data = self.annotation.get_data()?; + if !data.is_empty() { + if config.storage.encoding_format == EncodingFormat::Legacy { + return Err(spfs::Error::String( + "Cannot use '--annotation' when spfs is configured to use the 'Legacy' encoding format".to_string(), + ) + .into()); + } + + // These are added in reverse order so that the ones + // specified later on the command line will take precedence. + for (key, value) in data.into_iter().rev() { + tracing::trace!("annotation being added: {key}: {value}"); + runtime + .add_annotation(key, value, config.filesystem.annotation_size_limit) + .await?; + } + tracing::trace!(" with annotation: {:?}", runtime); + } + let start_time = Instant::now(); runtime.config.mount_backend = config.filesystem.backend; runtime.config.secondary_repositories = config.get_secondary_runtime_repositories(); diff --git a/crates/spfs-cli/main/src/cmd_run_test.rs b/crates/spfs-cli/main/src/cmd_run_test.rs new file mode 100644 index 0000000000..32edac6c36 --- /dev/null +++ b/crates/spfs-cli/main/src/cmd_run_test.rs @@ -0,0 +1,198 @@ +// Copyright (c) Sony Pictures Imageworks, et al. +// SPDX-License-Identifier: Apache-2.0 +// https://github.com/imageworks/spk + +use std::fs; + +use rstest::rstest; + +use super::fixtures::*; +use super::Annotation; + +// Test - --extra-data field1:value1 --extra-data field2:value2 +// Test - --extra-data field1=value1 --extra-data field2=value2 +// Test - --extra-data field1:value1 --extra-data field2=value2 +// Test - --extra-data field1=value1 --extra-data field2:value2 +// Test - --extra-data '{ field1: value1, field2: value2 }' +#[rstest] +#[case(vec!["field1:value1".to_string(), "field2:value2".to_string()])] +#[case(vec!["field1=value1".to_string(), "field2=value2".to_string()])] +#[case(vec!["field1:value1".to_string(), "field2=value2".to_string()])] +#[case(vec!["field1=value1".to_string(), "field2:value2".to_string()])] +#[case(vec!["{field1: value1, field2: value2}".to_string()])] +fn test_cmd_run_create_annotation(#[case] values: Vec) { + // Setup some data for the key value pairs + let field1 = "field1".to_string(); + let field2 = "field2".to_string(); + let value1 = "value1".to_string(); + let value2 = "value2".to_string(); + + let filenames = Vec::new(); + + // Test - --annotation field1:value1 --annotation field2:value2 + let data = Annotation { + annotation: values, + annotation_file: filenames, + }; + + let result = data + .get_data() + .expect("unable to parse cmd_run --annotation values"); + assert!(!result.is_empty()); + assert!(result.len() == 2); + println!("r0: {:?}", result[0]); + assert!(result[0] == (field1, value1)); + println!("r1: {:?}", result[1]); + assert!(result[1] == (field2, value2)); +} + +#[rstest] +#[should_panic] +#[case(vec!["field1,value1".to_string(), "field2/value2".to_string()])] +#[should_panic] +#[case(vec!["field1 value1".to_string(), "field2 value2".to_string()])] +#[should_panic] +#[case(vec!["{field1: value1, field2: - value2 - value1}".to_string()])] +fn test_cmd_run_create_annotation_invalid(#[case] invalid_values: Vec) { + let filenames = Vec::new(); + + // Test - --annotation with_an_invalid_argument_value + let data = Annotation { + annotation: invalid_values, + annotation_file: filenames, + }; + + let _result = data + .get_data() + .expect("unable to parse cmd_run --annotation values"); +} + +#[rstest] +fn test_cmd_run_create_annotation_from_file(tmpdir: tempfile::TempDir) { + // Setup some data for the key value pairs + let field1 = "field1".to_string(); + let field2 = "field2".to_string(); + let value1 = "value1".to_string(); + let value2 = "value2".to_string(); + let annotation = format!("{field1}: {value1}\n{field2}: {value2}\n"); + + let filename = tmpdir.path().join("filename.yaml"); + fs::write(filename.clone(), annotation) + .expect("Unable to write annotation to file during setup"); + let filenames = vec![filename]; + + let values = Vec::new(); + + // Test - --annotation-file filename.yaml + let data = Annotation { + annotation: values, + annotation_file: filenames, + }; + + let result = data + .get_data() + .expect("unable to parse cmd_run --annotation values"); + assert!(!result.is_empty()); + assert!(result.len() == 2); + assert!(result[0] == (field1, value1)); + assert!(result[1] == (field2, value2)); +} + +#[rstest] +#[should_panic] +fn test_cmd_run_create_annotation_from_file_not_exist(tmpdir: tempfile::TempDir) { + // Setup a file name that does not exist + let filename = tmpdir.path().join("nosuchfile.yaml"); + let filenames = vec![filename]; + + let values = Vec::new(); + + // Test - --annotation-file nosuchfile.yaml + let data = Annotation { + annotation: values, + annotation_file: filenames, + }; + + let _result = data + .get_data() + .expect("unable to parse cmd_run --annotation values"); +} + +#[rstest] +#[should_panic] +fn test_cmd_run_create_annotation_from_file_invalid_keyvalues(tmpdir: tempfile::TempDir) { + // Setup some data for the key value pairs + let field1 = "field1".to_string(); + let field2 = "field2".to_string(); + let value1 = "value1".to_string(); + let value2 = "value2".to_string(); + let annotation = format!("{field1}: {value1}\n{field2}:\n - {value2}\n - {value1}\n"); + + let filename = tmpdir.path().join("filename.yaml"); + fs::write(filename.clone(), annotation) + .expect("Unable to write annotation to file during setup"); + let filenames = vec![filename]; + + let values = Vec::new(); + + // Test - --annotation-file filename.yaml that contains more than key-value string pairs + let data = Annotation { + annotation: values, + annotation_file: filenames, + }; + + let _result = data + .get_data() + .expect("unable to parse cmd_run --annotation values"); +} + +#[rstest] +fn test_cmd_run_create_annotation_all(tmpdir: tempfile::TempDir) { + // Setup some data for the key value pairs + let field1 = "field1".to_string(); + let field2 = "field2".to_string(); + let field3 = "field3".to_string(); + let field4 = "field4".to_string(); + + let value1 = "value1".to_string(); + let value2 = "value2".to_string(); + let value3 = "value3".to_string(); + let value4 = "value4".to_string(); + + let values: Vec = vec![ + format!("{field1}:{value1}"), + format!("{field2}={value2}"), + "{field3: value3, field4: value4}".to_string(), + ]; + + let annotation = format!("{field4}: {value4}\n{field3}: {value3}\n"); + + let filename = tmpdir.path().join("filename.yaml"); + fs::write(filename.clone(), annotation) + .expect("Unable to write annotation to file during setup"); + let filenames = vec![filename]; + + // Test - --annotation field1:value1 --annotation field2=value2 + // --annotation '{ field1: value1, field2: value2 }' + // --annotation-file filename.yaml + let data = Annotation { + annotation: values, + annotation_file: filenames, + }; + + let result = data + .get_data() + .expect("unable to parse cmd_run --annotation values"); + + assert!(!result.is_empty()); + assert!(result.len() == 6); + // from --annotation-file filename.toml + assert!(result[0] == (field3.clone(), value3.clone())); + assert!(result[1] == (field4.clone(), value4.clone())); + // from --annotation field1:value1 annotation field2:value2 + assert!(result[2] == (field1, value1)); + assert!(result[3] == (field2, value2)); + // from --annotation '{field3: value3, field4: value4}' + assert!(result[4] == (field3, value3)); + assert!(result[5] == (field4, value4)); +} diff --git a/crates/spfs-cli/main/src/cmd_runtime_info.rs b/crates/spfs-cli/main/src/cmd_runtime_info.rs index 7c2edbe767..254254a975 100644 --- a/crates/spfs-cli/main/src/cmd_runtime_info.rs +++ b/crates/spfs-cli/main/src/cmd_runtime_info.rs @@ -4,6 +4,7 @@ use clap::Args; use miette::{Context, IntoDiagnostic, Result}; +use spfs_cli_common as cli; /// Show the complete state of a runtime #[derive(Debug, Args)] @@ -12,6 +13,9 @@ pub struct CmdRuntimeInfo { #[clap(short, long)] remote: Option, + #[clap(flatten)] + annotation: cli::AnnotationViewing, + /// The name/id of the runtime to remove #[clap(env = "SPFS_RUNTIME")] name: String, @@ -28,6 +32,12 @@ impl CmdRuntimeInfo { }; let runtime = runtime_storage.read_runtime(&self.name).await?; + + if self.annotation.get_all || self.annotation.get.is_some() { + self.annotation.print_data(&runtime).await?; + return Ok(0); + } + serde_json::to_writer_pretty(std::io::stdout(), runtime.data()) .into_diagnostic() .wrap_err("Failed to generate json output")?; diff --git a/crates/spfs-cli/main/src/cmd_shell.rs b/crates/spfs-cli/main/src/cmd_shell.rs index fdf915f825..aebf373dc2 100644 --- a/crates/spfs-cli/main/src/cmd_shell.rs +++ b/crates/spfs-cli/main/src/cmd_shell.rs @@ -7,6 +7,7 @@ use miette::Result; use spfs_cli_common as cli; use super::cmd_run; +use super::cmd_run::Annotation; /// Enter a subshell in a configured spfs environment #[derive(Debug, Args)] @@ -48,6 +49,9 @@ pub struct CmdShell { #[clap(short, long, env = "SPFS_KEEP_RUNTIME")] pub keep_runtime: bool, + #[clap(flatten)] + pub annotation: Annotation, + /// The tag or id of the desired runtime /// /// Use '-' or nothing to request an empty environment @@ -67,6 +71,7 @@ impl CmdShell { runtime_name: self.runtime_name.clone(), reference: self.reference.clone(), keep_runtime: self.keep_runtime, + annotation: self.annotation.clone(), command: Default::default(), }; run_cmd.run(config).await diff --git a/crates/spfs-cli/main/src/fixtures.rs b/crates/spfs-cli/main/src/fixtures.rs new file mode 100644 index 0000000000..96eee98c40 --- /dev/null +++ b/crates/spfs-cli/main/src/fixtures.rs @@ -0,0 +1,10 @@ +use rstest::fixture; +use tempfile::TempDir; + +#[fixture] +pub fn tmpdir() -> TempDir { + tempfile::Builder::new() + .prefix("spfs-test-") + .tempdir() + .expect("failed to create dir for test") +} diff --git a/crates/spfs-proto/schema/spfs.fbs b/crates/spfs-proto/schema/spfs.fbs index 071aa1de1c..4e2fddd86c 100644 --- a/crates/spfs-proto/schema/spfs.fbs +++ b/crates/spfs-proto/schema/spfs.fbs @@ -20,7 +20,9 @@ table Platform { } table Layer { - manifest:Digest (required); + manifest:Digest; + // Can be empty + annotations:[Annotation] (required); } table Manifest { @@ -34,6 +36,26 @@ table Blob { payload:Digest (required); } +/// Annotation data that is small enough is stored as a string in the +/// layer, large data is stored outside the layer in a blob object +/// pointed at by a digest +union AnnotationValue { AnnotationString, AnnotationDigest } + +/// Needed because unions have to contain tables +table AnnotationString { + data:string; +} +/// Needed because unions have to contain tables +table AnnotationDigest { + digest:Digest; +} + +/// Annotation data held in a layer for use by external tools run inside a runtime +table Annotation { + key:string (required); + data:AnnotationValue (required); +} + /// Digest is the result of a hashing operation over binary data. struct Digest { bytes:[uint8:32]; // SHA-256 output len (256 / 8) diff --git a/crates/spfs-proto/src/spfs_generated.rs b/crates/spfs-proto/src/spfs_generated.rs index 3d69f434b0..227b0602b1 100644 --- a/crates/spfs-proto/src/spfs_generated.rs +++ b/crates/spfs-proto/src/spfs_generated.rs @@ -108,6 +108,100 @@ impl<'a> flatbuffers::Verifiable for Object { impl flatbuffers::SimpleToVerifyInSlice for Object {} pub struct ObjectUnionTableOffset {} +#[deprecated(since = "2.0.0", note = "Use associated constants instead. This will no longer be generated in 2021.")] +pub const ENUM_MIN_ANNOTATION_VALUE: u8 = 0; +#[deprecated(since = "2.0.0", note = "Use associated constants instead. This will no longer be generated in 2021.")] +pub const ENUM_MAX_ANNOTATION_VALUE: u8 = 2; +#[deprecated(since = "2.0.0", note = "Use associated constants instead. This will no longer be generated in 2021.")] +#[allow(non_camel_case_types)] +pub const ENUM_VALUES_ANNOTATION_VALUE: [AnnotationValue; 3] = [ + AnnotationValue::NONE, + AnnotationValue::AnnotationString, + AnnotationValue::AnnotationDigest, +]; + +/// Annotation data that is small enough is stored as a string in the +/// layer, large data is stored outside the layer in a blob object +/// pointed at by a digest +#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)] +#[repr(transparent)] +pub struct AnnotationValue(pub u8); +#[allow(non_upper_case_globals)] +impl AnnotationValue { + pub const NONE: Self = Self(0); + pub const AnnotationString: Self = Self(1); + pub const AnnotationDigest: Self = Self(2); + + pub const ENUM_MIN: u8 = 0; + pub const ENUM_MAX: u8 = 2; + pub const ENUM_VALUES: &'static [Self] = &[ + Self::NONE, + Self::AnnotationString, + Self::AnnotationDigest, + ]; + /// Returns the variant's name or "" if unknown. + pub fn variant_name(self) -> Option<&'static str> { + match self { + Self::NONE => Some("NONE"), + Self::AnnotationString => Some("AnnotationString"), + Self::AnnotationDigest => Some("AnnotationDigest"), + _ => None, + } + } +} +impl core::fmt::Debug for AnnotationValue { + fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { + if let Some(name) = self.variant_name() { + f.write_str(name) + } else { + f.write_fmt(format_args!("", self.0)) + } + } +} +impl<'a> flatbuffers::Follow<'a> for AnnotationValue { + type Inner = Self; + #[inline] + unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { + let b = flatbuffers::read_scalar_at::(buf, loc); + Self(b) + } +} + +impl flatbuffers::Push for AnnotationValue { + type Output = AnnotationValue; + #[inline] + unsafe fn push(&self, dst: &mut [u8], _written_len: usize) { + flatbuffers::emplace_scalar::(dst, self.0); + } +} + +impl flatbuffers::EndianScalar for AnnotationValue { + type Scalar = u8; + #[inline] + fn to_little_endian(self) -> u8 { + self.0.to_le() + } + #[inline] + #[allow(clippy::wrong_self_convention)] + fn from_little_endian(v: u8) -> Self { + let b = u8::from_le(v); + Self(b) + } +} + +impl<'a> flatbuffers::Verifiable for AnnotationValue { + #[inline] + fn run_verifier( + v: &mut flatbuffers::Verifier, pos: usize + ) -> Result<(), flatbuffers::InvalidFlatbuffer> { + use self::flatbuffers::Verifiable; + u8::run_verifier(v, pos) + } +} + +impl flatbuffers::SimpleToVerifyInSlice for AnnotationValue {} +pub struct AnnotationValueUnionTableOffset {} + #[deprecated(since = "2.0.0", note = "Use associated constants instead. This will no longer be generated in 2021.")] pub const ENUM_MIN_ENTRY_KIND: u8 = 0; #[deprecated(since = "2.0.0", note = "Use associated constants instead. This will no longer be generated in 2021.")] @@ -390,6 +484,7 @@ impl<'a> flatbuffers::Follow<'a> for Layer<'a> { impl<'a> Layer<'a> { pub const VT_MANIFEST: flatbuffers::VOffsetT = 4; + pub const VT_ANNOTATIONS: flatbuffers::VOffsetT = 6; #[inline] pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self { @@ -401,17 +496,25 @@ impl<'a> Layer<'a> { args: &'args LayerArgs<'args> ) -> flatbuffers::WIPOffset> { let mut builder = LayerBuilder::new(_fbb); + if let Some(x) = args.annotations { builder.add_annotations(x); } if let Some(x) = args.manifest { builder.add_manifest(x); } builder.finish() } #[inline] - pub fn manifest(&self) -> &'a Digest { + pub fn manifest(&self) -> Option<&'a Digest> { + // Safety: + // Created from valid Table for this object + // which contains a valid value in this slot + unsafe { self._tab.get::(Layer::VT_MANIFEST, None)} + } + #[inline] + pub fn annotations(&self) -> flatbuffers::Vector<'a, flatbuffers::ForwardsUOffset>> { // Safety: // Created from valid Table for this object // which contains a valid value in this slot - unsafe { self._tab.get::(Layer::VT_MANIFEST, None).unwrap()} + unsafe { self._tab.get::>>>(Layer::VT_ANNOTATIONS, None).unwrap()} } } @@ -422,19 +525,22 @@ impl flatbuffers::Verifiable for Layer<'_> { ) -> Result<(), flatbuffers::InvalidFlatbuffer> { use self::flatbuffers::Verifiable; v.visit_table(pos)? - .visit_field::("manifest", Self::VT_MANIFEST, true)? + .visit_field::("manifest", Self::VT_MANIFEST, false)? + .visit_field::>>>("annotations", Self::VT_ANNOTATIONS, true)? .finish(); Ok(()) } } pub struct LayerArgs<'a> { pub manifest: Option<&'a Digest>, + pub annotations: Option>>>>, } impl<'a> Default for LayerArgs<'a> { #[inline] fn default() -> Self { LayerArgs { - manifest: None, // required field + manifest: None, + annotations: None, // required field } } } @@ -449,6 +555,10 @@ impl<'a: 'b, 'b> LayerBuilder<'a, 'b> { self.fbb_.push_slot_always::<&Digest>(Layer::VT_MANIFEST, manifest); } #[inline] + pub fn add_annotations(&mut self, annotations: flatbuffers::WIPOffset>>>) { + self.fbb_.push_slot_always::>(Layer::VT_ANNOTATIONS, annotations); + } + #[inline] pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>) -> LayerBuilder<'a, 'b> { let start = _fbb.start_table(); LayerBuilder { @@ -459,7 +569,7 @@ impl<'a: 'b, 'b> LayerBuilder<'a, 'b> { #[inline] pub fn finish(self) -> flatbuffers::WIPOffset> { let o = self.fbb_.end_table(self.start_); - self.fbb_.required(o, Layer::VT_MANIFEST,"manifest"); + self.fbb_.required(o, Layer::VT_ANNOTATIONS,"annotations"); flatbuffers::WIPOffset::new(o.value()) } } @@ -468,6 +578,7 @@ impl core::fmt::Debug for Layer<'_> { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { let mut ds = f.debug_struct("Layer"); ds.field("manifest", &self.manifest()); + ds.field("annotations", &self.annotations()); ds.finish() } } @@ -686,6 +797,388 @@ impl core::fmt::Debug for Blob<'_> { ds.finish() } } +pub enum AnnotationStringOffset {} +#[derive(Copy, Clone, PartialEq)] + +/// Needed because unions have to contain tables +pub struct AnnotationString<'a> { + pub _tab: flatbuffers::Table<'a>, +} + +impl<'a> flatbuffers::Follow<'a> for AnnotationString<'a> { + type Inner = AnnotationString<'a>; + #[inline] + unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { + Self { _tab: flatbuffers::Table::new(buf, loc) } + } +} + +impl<'a> AnnotationString<'a> { + pub const VT_DATA: flatbuffers::VOffsetT = 4; + + #[inline] + pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self { + AnnotationString { _tab: table } + } + #[allow(unused_mut)] + pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr>( + _fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr>, + args: &'args AnnotationStringArgs<'args> + ) -> flatbuffers::WIPOffset> { + let mut builder = AnnotationStringBuilder::new(_fbb); + if let Some(x) = args.data { builder.add_data(x); } + builder.finish() + } + + + #[inline] + pub fn data(&self) -> Option<&'a str> { + // Safety: + // Created from valid Table for this object + // which contains a valid value in this slot + unsafe { self._tab.get::>(AnnotationString::VT_DATA, None)} + } +} + +impl flatbuffers::Verifiable for AnnotationString<'_> { + #[inline] + fn run_verifier( + v: &mut flatbuffers::Verifier, pos: usize + ) -> Result<(), flatbuffers::InvalidFlatbuffer> { + use self::flatbuffers::Verifiable; + v.visit_table(pos)? + .visit_field::>("data", Self::VT_DATA, false)? + .finish(); + Ok(()) + } +} +pub struct AnnotationStringArgs<'a> { + pub data: Option>, +} +impl<'a> Default for AnnotationStringArgs<'a> { + #[inline] + fn default() -> Self { + AnnotationStringArgs { + data: None, + } + } +} + +pub struct AnnotationStringBuilder<'a: 'b, 'b> { + fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a>, + start_: flatbuffers::WIPOffset, +} +impl<'a: 'b, 'b> AnnotationStringBuilder<'a, 'b> { + #[inline] + pub fn add_data(&mut self, data: flatbuffers::WIPOffset<&'b str>) { + self.fbb_.push_slot_always::>(AnnotationString::VT_DATA, data); + } + #[inline] + pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>) -> AnnotationStringBuilder<'a, 'b> { + let start = _fbb.start_table(); + AnnotationStringBuilder { + fbb_: _fbb, + start_: start, + } + } + #[inline] + pub fn finish(self) -> flatbuffers::WIPOffset> { + let o = self.fbb_.end_table(self.start_); + flatbuffers::WIPOffset::new(o.value()) + } +} + +impl core::fmt::Debug for AnnotationString<'_> { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let mut ds = f.debug_struct("AnnotationString"); + ds.field("data", &self.data()); + ds.finish() + } +} +pub enum AnnotationDigestOffset {} +#[derive(Copy, Clone, PartialEq)] + +/// Needed because unions have to contain tables +pub struct AnnotationDigest<'a> { + pub _tab: flatbuffers::Table<'a>, +} + +impl<'a> flatbuffers::Follow<'a> for AnnotationDigest<'a> { + type Inner = AnnotationDigest<'a>; + #[inline] + unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { + Self { _tab: flatbuffers::Table::new(buf, loc) } + } +} + +impl<'a> AnnotationDigest<'a> { + pub const VT_DIGEST: flatbuffers::VOffsetT = 4; + + #[inline] + pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self { + AnnotationDigest { _tab: table } + } + #[allow(unused_mut)] + pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr>( + _fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr>, + args: &'args AnnotationDigestArgs<'args> + ) -> flatbuffers::WIPOffset> { + let mut builder = AnnotationDigestBuilder::new(_fbb); + if let Some(x) = args.digest { builder.add_digest(x); } + builder.finish() + } + + + #[inline] + pub fn digest(&self) -> Option<&'a Digest> { + // Safety: + // Created from valid Table for this object + // which contains a valid value in this slot + unsafe { self._tab.get::(AnnotationDigest::VT_DIGEST, None)} + } +} + +impl flatbuffers::Verifiable for AnnotationDigest<'_> { + #[inline] + fn run_verifier( + v: &mut flatbuffers::Verifier, pos: usize + ) -> Result<(), flatbuffers::InvalidFlatbuffer> { + use self::flatbuffers::Verifiable; + v.visit_table(pos)? + .visit_field::("digest", Self::VT_DIGEST, false)? + .finish(); + Ok(()) + } +} +pub struct AnnotationDigestArgs<'a> { + pub digest: Option<&'a Digest>, +} +impl<'a> Default for AnnotationDigestArgs<'a> { + #[inline] + fn default() -> Self { + AnnotationDigestArgs { + digest: None, + } + } +} + +pub struct AnnotationDigestBuilder<'a: 'b, 'b> { + fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a>, + start_: flatbuffers::WIPOffset, +} +impl<'a: 'b, 'b> AnnotationDigestBuilder<'a, 'b> { + #[inline] + pub fn add_digest(&mut self, digest: &Digest) { + self.fbb_.push_slot_always::<&Digest>(AnnotationDigest::VT_DIGEST, digest); + } + #[inline] + pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>) -> AnnotationDigestBuilder<'a, 'b> { + let start = _fbb.start_table(); + AnnotationDigestBuilder { + fbb_: _fbb, + start_: start, + } + } + #[inline] + pub fn finish(self) -> flatbuffers::WIPOffset> { + let o = self.fbb_.end_table(self.start_); + flatbuffers::WIPOffset::new(o.value()) + } +} + +impl core::fmt::Debug for AnnotationDigest<'_> { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let mut ds = f.debug_struct("AnnotationDigest"); + ds.field("digest", &self.digest()); + ds.finish() + } +} +pub enum AnnotationOffset {} +#[derive(Copy, Clone, PartialEq)] + +/// Annotation data held in a layer for use by external tools run inside a runtime +pub struct Annotation<'a> { + pub _tab: flatbuffers::Table<'a>, +} + +impl<'a> flatbuffers::Follow<'a> for Annotation<'a> { + type Inner = Annotation<'a>; + #[inline] + unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { + Self { _tab: flatbuffers::Table::new(buf, loc) } + } +} + +impl<'a> Annotation<'a> { + pub const VT_KEY: flatbuffers::VOffsetT = 4; + pub const VT_DATA_TYPE: flatbuffers::VOffsetT = 6; + pub const VT_DATA: flatbuffers::VOffsetT = 8; + + #[inline] + pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self { + Annotation { _tab: table } + } + #[allow(unused_mut)] + pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr>( + _fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr>, + args: &'args AnnotationArgs<'args> + ) -> flatbuffers::WIPOffset> { + let mut builder = AnnotationBuilder::new(_fbb); + if let Some(x) = args.data { builder.add_data(x); } + if let Some(x) = args.key { builder.add_key(x); } + builder.add_data_type(args.data_type); + builder.finish() + } + + + #[inline] + pub fn key(&self) -> &'a str { + // Safety: + // Created from valid Table for this object + // which contains a valid value in this slot + unsafe { self._tab.get::>(Annotation::VT_KEY, None).unwrap()} + } + #[inline] + pub fn data_type(&self) -> AnnotationValue { + // Safety: + // Created from valid Table for this object + // which contains a valid value in this slot + unsafe { self._tab.get::(Annotation::VT_DATA_TYPE, Some(AnnotationValue::NONE)).unwrap()} + } + #[inline] + pub fn data(&self) -> flatbuffers::Table<'a> { + // Safety: + // Created from valid Table for this object + // which contains a valid value in this slot + unsafe { self._tab.get::>>(Annotation::VT_DATA, None).unwrap()} + } + #[inline] + #[allow(non_snake_case)] + pub fn data_as_annotation_string(&self) -> Option> { + if self.data_type() == AnnotationValue::AnnotationString { + let u = self.data(); + // Safety: + // Created from a valid Table for this object + // Which contains a valid union in this slot + Some(unsafe { AnnotationString::init_from_table(u) }) + } else { + None + } + } + + #[inline] + #[allow(non_snake_case)] + pub fn data_as_annotation_digest(&self) -> Option> { + if self.data_type() == AnnotationValue::AnnotationDigest { + let u = self.data(); + // Safety: + // Created from a valid Table for this object + // Which contains a valid union in this slot + Some(unsafe { AnnotationDigest::init_from_table(u) }) + } else { + None + } + } + +} + +impl flatbuffers::Verifiable for Annotation<'_> { + #[inline] + fn run_verifier( + v: &mut flatbuffers::Verifier, pos: usize + ) -> Result<(), flatbuffers::InvalidFlatbuffer> { + use self::flatbuffers::Verifiable; + v.visit_table(pos)? + .visit_field::>("key", Self::VT_KEY, true)? + .visit_union::("data_type", Self::VT_DATA_TYPE, "data", Self::VT_DATA, true, |key, v, pos| { + match key { + AnnotationValue::AnnotationString => v.verify_union_variant::>("AnnotationValue::AnnotationString", pos), + AnnotationValue::AnnotationDigest => v.verify_union_variant::>("AnnotationValue::AnnotationDigest", pos), + _ => Ok(()), + } + })? + .finish(); + Ok(()) + } +} +pub struct AnnotationArgs<'a> { + pub key: Option>, + pub data_type: AnnotationValue, + pub data: Option>, +} +impl<'a> Default for AnnotationArgs<'a> { + #[inline] + fn default() -> Self { + AnnotationArgs { + key: None, // required field + data_type: AnnotationValue::NONE, + data: None, // required field + } + } +} + +pub struct AnnotationBuilder<'a: 'b, 'b> { + fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a>, + start_: flatbuffers::WIPOffset, +} +impl<'a: 'b, 'b> AnnotationBuilder<'a, 'b> { + #[inline] + pub fn add_key(&mut self, key: flatbuffers::WIPOffset<&'b str>) { + self.fbb_.push_slot_always::>(Annotation::VT_KEY, key); + } + #[inline] + pub fn add_data_type(&mut self, data_type: AnnotationValue) { + self.fbb_.push_slot::(Annotation::VT_DATA_TYPE, data_type, AnnotationValue::NONE); + } + #[inline] + pub fn add_data(&mut self, data: flatbuffers::WIPOffset) { + self.fbb_.push_slot_always::>(Annotation::VT_DATA, data); + } + #[inline] + pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>) -> AnnotationBuilder<'a, 'b> { + let start = _fbb.start_table(); + AnnotationBuilder { + fbb_: _fbb, + start_: start, + } + } + #[inline] + pub fn finish(self) -> flatbuffers::WIPOffset> { + let o = self.fbb_.end_table(self.start_); + self.fbb_.required(o, Annotation::VT_KEY,"key"); + self.fbb_.required(o, Annotation::VT_DATA,"data"); + flatbuffers::WIPOffset::new(o.value()) + } +} + +impl core::fmt::Debug for Annotation<'_> { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let mut ds = f.debug_struct("Annotation"); + ds.field("key", &self.key()); + ds.field("data_type", &self.data_type()); + match self.data_type() { + AnnotationValue::AnnotationString => { + if let Some(x) = self.data_as_annotation_string() { + ds.field("data", &x) + } else { + ds.field("data", &"InvalidFlatbuffer: Union discriminant does not match value.") + } + }, + AnnotationValue::AnnotationDigest => { + if let Some(x) = self.data_as_annotation_digest() { + ds.field("data", &x) + } else { + ds.field("data", &"InvalidFlatbuffer: Union discriminant does not match value.") + } + }, + _ => { + let x: Option<()> = None; + ds.field("data", &x) + }, + }; + ds.finish() + } +} pub enum TreeOffset {} #[derive(Copy, Clone, PartialEq)] diff --git a/crates/spfs/src/check.rs b/crates/spfs/src/check.rs index fc380bfdc0..ded852cd98 100644 --- a/crates/spfs/src/check.rs +++ b/crates/spfs/src/check.rs @@ -12,6 +12,7 @@ use once_cell::sync::OnceCell; use progress_bar_derive_macro::ProgressBar; use tokio::sync::Semaphore; +use crate::graph::AnnotationValue; use crate::prelude::*; use crate::sync::{SyncObjectResult, SyncPayloadResult, SyncPolicy}; use crate::{encoding, graph, storage, tracking, Error, Result}; @@ -333,10 +334,33 @@ where /// /// To also check if the layer object exists, use [`Self::check_digest`] pub async fn check_layer(&self, layer: graph::Layer) -> Result { - let result = self.check_digest(*layer.manifest()).await?; + let manifest_result = if let Some(manifest_digest) = layer.manifest() { + self.check_digest(*manifest_digest).await? + } else { + // This layer has no manifest, don't worry about it + CheckObjectResult::Ignorable + }; + let annotations = layer.annotations(); + let annotation_results = if annotations.is_empty() { + vec![CheckObjectResult::Annotation( + CheckAnnotationResult::InternalValue, + )] + } else { + let mut results = Vec::new(); + for entry in annotations { + results.push(CheckObjectResult::Annotation( + self.check_annotation(entry.into()).await?, + )); + } + results + }; + + let mut results = vec![manifest_result]; + results.extend(annotation_results); + let res = CheckLayerResult { layer, - result, + results, repaired: false, }; Ok(res) @@ -362,6 +386,26 @@ where Ok(res) } + /// Validate that the identified annotation layer's value exists. + pub async fn check_annotation( + &self, + annotation: graph::Annotation<'_>, + ) -> Result { + let res = match annotation.value() { + AnnotationValue::String(_) => CheckAnnotationResult::InternalValue, + AnnotationValue::Blob(d) => { + let blob = self.repo.read_blob(d).await?; + let result = unsafe { self.check_blob(&blob).await? }; + CheckAnnotationResult::Checked { + digest: d, + result, + repaired: false, + } + } + }; + Ok(res) + } + /// Validate that the identified blob has its payload. /// /// To also check if the blob object exists, use [`Self::check_digest`] @@ -801,6 +845,7 @@ pub enum CheckObjectResult { Layer(Box), Blob(CheckBlobResult), Manifest(CheckManifestResult), + Annotation(CheckAnnotationResult), } impl CheckObjectResult { @@ -815,6 +860,7 @@ impl CheckObjectResult { CheckObjectResult::Layer(r) => r.set_repaired(), CheckObjectResult::Blob(r) => r.set_repaired(), CheckObjectResult::Manifest(r) => r.set_repaired(), + CheckObjectResult::Annotation(r) => r.set_repaired(), } } @@ -831,6 +877,7 @@ impl CheckObjectResult { Layer(res) => res.summary(), Blob(res) => res.summary(), Manifest(res) => res.summary(), + Annotation(res) => res.summary(), } } } @@ -862,7 +909,7 @@ impl CheckPlatformResult { pub struct CheckLayerResult { pub repaired: bool, pub layer: graph::Layer, - pub result: CheckObjectResult, + pub results: Vec, } impl CheckLayerResult { @@ -872,7 +919,7 @@ impl CheckLayerResult { } pub fn summary(&self) -> CheckSummary { - let mut summary = self.result.summary(); + let mut summary: CheckSummary = self.results.iter().map(|r| r.summary()).sum(); summary += CheckSummary::checked_one_object(); if self.repaired { summary.repaired_objects += 1; @@ -904,6 +951,34 @@ impl CheckManifestResult { } } +#[derive(Debug)] +pub enum CheckAnnotationResult { + /// The annotation was stored directly in the layer and did not + /// need checking + InternalValue, + /// The annotation was stored in a blob and was checked + Checked { + digest: encoding::Digest, + result: CheckBlobResult, + repaired: bool, + }, +} + +impl CheckAnnotationResult { + fn set_repaired(&mut self) { + if let Self::Checked { repaired, .. } = self { + *repaired = true; + } + } + + pub fn summary(&self) -> CheckSummary { + match self { + Self::InternalValue => CheckSummary::default(), + Self::Checked { result, .. } => result.summary(), + } + } +} + #[derive(Debug)] pub enum CheckEntryResult { /// The entry was not one that needed checking diff --git a/crates/spfs/src/config.rs b/crates/spfs/src/config.rs index 7947e14a5f..3b169ba001 100644 --- a/crates/spfs/src/config.rs +++ b/crates/spfs/src/config.rs @@ -13,6 +13,7 @@ use serde::{Deserialize, Serialize}; use storage::{FromConfig, FromUrl}; use tokio_stream::StreamExt; +use crate::graph::DEFAULT_SPFS_ANNOTATION_LAYER_MAX_STRING_VALUE_SIZE; use crate::storage::{TagNamespaceBuf, TagStorageMut}; use crate::{graph, runtime, storage, tracking, Error, Result}; @@ -322,6 +323,13 @@ pub struct Filesystem { /// systems that can perform read-through lookups, such as FUSE. #[serde(default = "Filesystem::default_secondary_repositories")] pub secondary_repositories: Vec, + + /// The size limit for an annotation before the data is stored in + /// a sepearate blob payload referenced in an annotation + /// layer. Data values smaller than or equal to this are stored + /// directly in the annotation layer. + #[serde(default = "Filesystem::default_annotation_size_limit")] + pub annotation_size_limit: usize, } impl Filesystem { @@ -330,6 +338,13 @@ impl Filesystem { pub fn default_secondary_repositories() -> Vec { vec![String::from("origin")] } + + /// The default size limit for a piece of annotation data before it + /// is stored in a separate blob payload from the annotation + /// layer that contains it + pub fn default_annotation_size_limit() -> usize { + DEFAULT_SPFS_ANNOTATION_LAYER_MAX_STRING_VALUE_SIZE + } } /// Configuration options for the fuse filesystem process diff --git a/crates/spfs/src/find_path.rs b/crates/spfs/src/find_path.rs index 00316f5b82..a013e0bdf0 100644 --- a/crates/spfs/src/find_path.rs +++ b/crates/spfs/src/find_path.rs @@ -89,13 +89,15 @@ async fn find_path_in_spfs_item( } graph::object::Enum::Layer(obj) => { - let item = repo.read_object(*obj.manifest()).await?; - let paths_to_file = find_path_in_spfs_item(filepath, &item, repo).await?; - for path in paths_to_file { - let mut new_path: ObjectPath = Vec::new(); - new_path.push(ObjectPathEntry::Parent(obj.to_object())); - new_path.extend(path); - paths.push(new_path); + if let Some(manifest_digest) = obj.manifest() { + let item = repo.read_object(*manifest_digest).await?; + let paths_to_file = find_path_in_spfs_item(filepath, &item, repo).await?; + for path in paths_to_file { + let mut new_path: ObjectPath = Vec::new(); + new_path.push(ObjectPathEntry::Parent(obj.to_object())); + new_path.extend(path); + paths.push(new_path); + } } } @@ -115,9 +117,8 @@ async fn find_path_in_spfs_item( } graph::object::Enum::Blob(_) => { - // These are not examined here when searching for the - // filepath because the filepath will be found by walking - // Manifest objects. + // Not examined here when searching for the filepath because + // filepaths are only found by walking Manifest objects. } }; diff --git a/crates/spfs/src/graph/annotation.rs b/crates/spfs/src/graph/annotation.rs new file mode 100644 index 0000000000..a0a22dc9d6 --- /dev/null +++ b/crates/spfs/src/graph/annotation.rs @@ -0,0 +1,191 @@ +// Copyright (c) Sony Pictures Imageworks, et al. +// SPDX-License-Identifier: Apache-2.0 +// https://github.com/imageworks/spk + +use std::fmt::Display; + +use crate::{encoding, Result}; + +#[cfg(test)] +#[path = "./annotation_test.rs"] +mod annotation_test; + +/// Default size limit for string valus stored directly in an +/// annotation object. Values larger than this are stored in a blob +/// that is referenced from the annotation object. +pub const DEFAULT_SPFS_ANNOTATION_LAYER_MAX_STRING_VALUE_SIZE: usize = 16 * 1024; + +/// Legacy encoding values for distinguishing the kind of +/// AnnotationValue being encoded. +#[repr(u8)] +enum AnnotationValueKind { + String = 1, + Blob = 2, +} + +/// Wrapper for the ways annotation values are stored +#[derive(Debug, Clone, Eq, PartialEq, Hash)] +pub enum AnnotationValue { + /// In the Annotation object as a string + String(String), + /// In a separate blob payload pointed at by the digest + Blob(encoding::Digest), +} + +impl Default for AnnotationValue { + fn default() -> Self { + AnnotationValue::String(Default::default()) + } +} + +impl AnnotationValue { + pub fn build( + &self, + builder: &mut flatbuffers::FlatBufferBuilder<'_>, + ) -> flatbuffers::WIPOffset { + match self { + AnnotationValue::String(data_string) => { + let string_data = builder.create_string(data_string.as_ref()); + spfs_proto::AnnotationString::create( + builder, + &spfs_proto::AnnotationStringArgs { + data: Some(string_data), + }, + ) + .as_union_value() + } + AnnotationValue::Blob(data_digest) => spfs_proto::AnnotationDigest::create( + builder, + &spfs_proto::AnnotationDigestArgs { + digest: Some(data_digest), + }, + ) + .as_union_value(), + } + } + + /// The underlying spfs_proto enum entry for this kind of value. + pub fn to_proto(&self) -> spfs_proto::AnnotationValue { + match self { + AnnotationValue::String(_) => spfs_proto::AnnotationValue::AnnotationString, + AnnotationValue::Blob(_) => spfs_proto::AnnotationValue::AnnotationDigest, + } + } + + /// True if this value is stored directly as a string + pub fn is_string(&self) -> bool { + matches!(self, Self::String(_)) + } + + /// True if this value is stored in-directly in blob object + pub fn is_blob(&self) -> bool { + matches!(self, Self::Blob(_)) + } + + /// Note: This is used for legacy and new style digest calculations + pub fn legacy_encode(&self, mut writer: &mut impl std::io::Write) -> Result<()> { + match self { + AnnotationValue::String(v) => { + encoding::write_uint8(&mut writer, AnnotationValueKind::String as u8)?; + Ok(encoding::write_string(writer, v.as_str())?) + } + AnnotationValue::Blob(v) => { + encoding::write_uint8(&mut writer, AnnotationValueKind::Blob as u8)?; + Ok(encoding::write_digest(writer, v)?) + } + } + } +} + +impl Display for AnnotationValue { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + AnnotationValue::String(v) => { + write!(f, "{v}") + } + AnnotationValue::Blob(v) => { + write!(f, "{v}") + } + } + } +} + +/// Annotation represents a key-value pair of data from an external +/// program injected into a spfs runtime layer for later use by another +/// external program. +#[derive(Copy, Clone)] +pub struct Annotation<'buf>(pub(super) spfs_proto::Annotation<'buf>); + +impl<'buf> std::fmt::Debug for Annotation<'buf> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Annotation") + .field("key", &self.key()) + .field("value", &self.value()) + .finish() + } +} + +impl<'buf> From> for Annotation<'buf> { + fn from(value: spfs_proto::Annotation<'buf>) -> Self { + Self(value) + } +} + +impl<'buf> Annotation<'buf> { + #[inline] + pub fn key(&self) -> &'buf str { + self.0.key() + } + + pub fn value(&self) -> AnnotationValue { + if let Some(data) = self.0.data_as_annotation_string() { + match data.data() { + Some(s) => AnnotationValue::String(s.into()), + None => { + panic!("This should not happen because the data type was AnnotationValueString") + } + } + } else if let Some(data) = self.0.data_as_annotation_digest() { + match data.digest() { + Some(d) => AnnotationValue::Blob(*d), + None => { + panic!("This should not happen because the data type was AnnotationValueDigest") + } + } + } else { + panic!("This should not happen because the data type was AnnotationValueDigest") + } + } + + /// Return the child objects of this object, if any. + pub fn child_objects(&self) -> Vec { + let mut result = Vec::new(); + if let Some(data) = self.0.data_as_annotation_digest() { + if let Some(d) = data.digest() { + result.push(*d) + } + } + result + } + + /// Return the size of this annotation + pub fn size(&self) -> u64 { + match &self.value() { + AnnotationValue::String(v) => v.len() as u64, + AnnotationValue::Blob(d) => d.len() as u64, + } + } + + // Note: This is used for legacy and new style digest calculations + pub fn legacy_encode(&self, mut writer: &mut impl std::io::Write) -> Result<()> { + encoding::write_string(&mut writer, self.key())?; + self.value().legacy_encode(&mut writer)?; + Ok(()) + } +} + +impl<'buf> std::fmt::Display for Annotation<'buf> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!("{}: {:?}", self.key(), self.value())) + } +} diff --git a/crates/spfs/src/graph/annotation_test.rs b/crates/spfs/src/graph/annotation_test.rs new file mode 100644 index 0000000000..474556addf --- /dev/null +++ b/crates/spfs/src/graph/annotation_test.rs @@ -0,0 +1,28 @@ +// Copyright (c) Sony Pictures Imageworks, et al. +// SPDX-License-Identifier: Apache-2.0 +// https://github.com/imageworks/spk + +use rstest::rstest; +use spfs_encoding::Digest; + +use super::AnnotationValue; +use crate::encoding; + +#[rstest] +fn test_annotationvalue_string() { + let value = String::from("value"); + let string_value = AnnotationValue::String(value); + + assert!(string_value.is_string()); + assert!(!string_value.is_blob()); +} + +#[rstest] +fn test_annotationvalue_blob() { + let digest: Digest = encoding::EMPTY_DIGEST.into(); + + let blob_value = AnnotationValue::Blob(digest); + + assert!(blob_value.is_blob()); + assert!(!blob_value.is_string()); +} diff --git a/crates/spfs/src/graph/layer.rs b/crates/spfs/src/graph/layer.rs index 902bd5381f..3bab7a068d 100644 --- a/crates/spfs/src/graph/layer.rs +++ b/crates/spfs/src/graph/layer.rs @@ -5,7 +5,7 @@ use spfs_proto::LayerArgs; use super::object::HeaderBuilder; -use super::ObjectKind; +use super::{Annotation, AnnotationValue, ObjectKind}; use crate::{encoding, Error, Result}; #[cfg(test)] @@ -22,7 +22,13 @@ pub type Layer = super::object::FlatObject>; impl std::fmt::Debug for Layer { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Layer") - .field("manifest", &self.manifest().to_string()) + .field( + "manifest", + &self + .manifest() + .map_or(String::from("None"), |d| d.to_string()), + ) + .field("annotations", &self.annotations()) .finish() } } @@ -36,24 +42,107 @@ impl Layer { Self::builder().with_manifest(manifest).build() } + /// Build a layer with the default header that has the provided + /// annotation data but does not point at any manifest, for more + /// configuration use [`Self::builder`] + #[inline] + pub fn new_with_annotation(key: String, value: AnnotationValue) -> Self { + Self::builder().with_annotation(key, value).build() + } + + /// Build a layer with the default header that has the provided + /// annotation data but does not point at any manifest, for more + /// configuration use [`Self::builder`] + #[inline] + pub fn new_with_annotations(annotations: Vec) -> Self { + Self::builder().with_annotations(annotations).build() + } + + /// Build a layer with the default header that points at the + /// provided manifest digest and the provided annotation, for + /// more configuration use [`Self::builder`] + #[inline] + pub fn new_with_manifest_and_annotation( + manifest: encoding::Digest, + key: String, + value: AnnotationValue, + ) -> Self { + Self::builder() + .with_manifest(manifest) + .with_annotation(key, value) + .build() + } + + /// Build a layer with the default header that points at the + /// provided manifest digest and the provided annotation, for + /// more configuration use [`Self::builder`] + #[inline] + pub fn new_with_manifest_and_annotations( + manifest: encoding::Digest, + annotations: Vec, + ) -> Self { + Self::builder() + .with_manifest(manifest) + .with_annotations(annotations) + .build() + } + #[inline] pub fn builder() -> LayerBuilder { LayerBuilder::default() } #[inline] - pub fn manifest(&self) -> &encoding::Digest { + pub fn manifest(&self) -> Option<&encoding::Digest> { self.proto().manifest() } + #[inline] + pub fn annotations(&self) -> Vec { + self.proto() + .annotations() + .iter() + .collect::>() + } + /// Return the child object of this one in the object DG. #[inline] pub fn child_objects(&self) -> Vec { - vec![*self.manifest()] + let mut children = Vec::new(); + if let Some(manifest_digest) = self.manifest() { + children.push(*manifest_digest) + } + for entry in self.annotations() { + let annotation: Annotation = entry.into(); + children.extend(annotation.child_objects()); + } + children } - pub(super) fn legacy_encode(&self, writer: &mut impl std::io::Write) -> Result<()> { - encoding::write_digest(writer, self.manifest()).map_err(Error::Encoding) + pub(super) fn legacy_encode(&self, mut writer: &mut impl std::io::Write) -> Result<()> { + let annotations = self.annotations(); + let result = if let Some(manifest_digest) = self.manifest() { + let manifest_result = + encoding::write_digest(&mut writer, manifest_digest).map_err(Error::Encoding); + for entry in annotations { + let annotation: Annotation = entry.into(); + annotation.legacy_encode(&mut writer)?; + } + manifest_result + } else if !annotations.is_empty() { + for entry in annotations { + let annotation: Annotation = entry.into(); + annotation.legacy_encode(&mut writer)?; + } + Ok(()) + } else { + Err(Error::String( + "Invalid Layer object for legacy encoding, it has no manifest or annotation data" + .to_string(), + )) + }; + + result } } @@ -71,16 +160,22 @@ impl std::cmp::PartialEq for Layer { impl std::cmp::Eq for Layer {} +/// Data type for pairs of keys and annotation values used during +/// construction of a layer's annotations. +pub type KeyAnnotationValuePair = (String, AnnotationValue); + pub struct LayerBuilder { header: super::object::HeaderBuilder, - manifest: encoding::Digest, + manifest: Option, + annotations: Vec, } impl Default for LayerBuilder { fn default() -> Self { Self { header: super::object::HeaderBuilder::new(ObjectKind::Layer), - manifest: encoding::NULL_DIGEST.into(), + manifest: None, + annotations: Vec::new(), } } } @@ -95,16 +190,45 @@ impl LayerBuilder { } pub fn with_manifest(mut self, manifest: encoding::Digest) -> Self { - self.manifest = manifest; + self.manifest = Some(manifest); + self + } + + pub fn with_annotation(mut self, key: String, value: AnnotationValue) -> Self { + self.annotations.push((key, value)); + self + } + + pub fn with_annotations(mut self, annotations: Vec) -> Self { + self.annotations.extend(annotations); self } pub fn build(&self) -> Layer { super::BUILDER.with_borrow_mut(|builder| { + let ffb_annotations: Vec<_> = self + .annotations + .iter() + .map(|(k, v)| { + let key = builder.create_string(k); + let value = v.build(builder); + spfs_proto::Annotation::create( + builder, + &spfs_proto::AnnotationArgs { + key: Some(key), + data_type: v.to_proto(), + data: Some(value), + }, + ) + }) + .collect(); + let annotations = Some(builder.create_vector(&ffb_annotations)); + let layer = spfs_proto::Layer::create( builder, &LayerArgs { - manifest: Some(&self.manifest), + manifest: self.manifest.as_ref(), + annotations, }, ); let any = spfs_proto::AnyObject::create( @@ -138,6 +262,10 @@ impl LayerBuilder { /// Read a data encoded using the legacy format, and /// use the data to fill and complete this builder pub fn legacy_decode(self, reader: &mut impl std::io::Read) -> Result { + tracing::trace!("layer legacy_decode called..."); + // Legacy layers do not have an annotation field. Trying + // to read a layer with no manifest and only an annotation + // here will fail. Ok(self.with_manifest(encoding::read_digest(reader)?).build()) } } diff --git a/crates/spfs/src/graph/layer_test.rs b/crates/spfs/src/graph/layer_test.rs index 1bbfdf5be4..1878f9c751 100644 --- a/crates/spfs/src/graph/layer_test.rs +++ b/crates/spfs/src/graph/layer_test.rs @@ -7,10 +7,11 @@ use rstest::rstest; use super::Layer; use crate::encoding; use crate::encoding::prelude::*; -use crate::graph::Object; +use crate::graph::object::EncodingFormat; +use crate::graph::{AnnotationValue, Object}; #[rstest] -fn test_layer_encoding() { +fn test_layer_encoding_manifest_only() { let expected = Layer::new(encoding::EMPTY_DIGEST.into()); let mut stream = Vec::new(); expected.encode(&mut stream).unwrap(); @@ -20,3 +21,62 @@ fn test_layer_encoding() { .unwrap(); assert_eq!(actual.digest().unwrap(), expected.digest().unwrap()) } + +#[rstest] +fn test_layer_encoding_annotation_only() { + let expected = Layer::new_with_annotation( + "key".to_string(), + AnnotationValue::String("value".to_string()), + ); + tracing::error!("Expected: {:?}", expected); + + let mut stream = Vec::new(); + expected.encode(&mut stream).unwrap(); + + let decoded = Object::decode(&mut stream.as_slice()); + if EncodingFormat::default() == EncodingFormat::Legacy { + if decoded.is_ok() { + panic!("This test should fail when EncodingFormat::Legacy is the default") + } + // Don't run the rest of the test when EncodingFormat::Legacy is used + return; + }; + + let actual = decoded.unwrap().into_layer().unwrap(); + println!(" Actual: {:?}", actual); + + assert_eq!(actual.digest().unwrap(), expected.digest().unwrap()) +} + +#[rstest] +fn test_layer_encoding_manifest_and_annotations() { + let expected = Layer::new_with_manifest_and_annotations( + encoding::EMPTY_DIGEST.into(), + vec![( + "key".to_string(), + AnnotationValue::String("value".to_string()), + )], + ); + println!("Expected: {:?}", expected); + + let mut stream = Vec::new(); + expected.encode(&mut stream).unwrap(); + + let actual = Object::decode(&mut stream.as_slice()) + .unwrap() + .into_layer() + .unwrap(); + println!(" Actual: {:?}", actual); + + match EncodingFormat::default() { + EncodingFormat::Legacy => { + // Legacy encoding does not support annotaion data, so these won't match + assert_ne!(actual.digest().unwrap(), expected.digest().unwrap()) + } + EncodingFormat::FlatBuffers => { + // Under flatbuffers encoding both will contain the + // annotation data and will match + assert_eq!(actual.digest().unwrap(), expected.digest().unwrap()) + } + } +} diff --git a/crates/spfs/src/graph/mod.rs b/crates/spfs/src/graph/mod.rs index 87de2962b8..e5282400ae 100644 --- a/crates/spfs/src/graph/mod.rs +++ b/crates/spfs/src/graph/mod.rs @@ -4,6 +4,7 @@ //! Low-level digraph representation and manipulation for data storage. +mod annotation; mod blob; mod database; mod entry; @@ -18,6 +19,11 @@ mod tree; use std::cell::RefCell; +pub use annotation::{ + Annotation, + AnnotationValue, + DEFAULT_SPFS_ANNOTATION_LAYER_MAX_STRING_VALUE_SIZE, +}; pub use blob::Blob; pub use database::{ Database, @@ -28,7 +34,7 @@ pub use database::{ }; pub use entry::Entry; pub use kind::{HasKind, Kind, ObjectKind}; -pub use layer::Layer; +pub use layer::{KeyAnnotationValuePair, Layer}; pub use manifest::Manifest; pub use object::{FlatObject, Object, ObjectProto}; pub use platform::Platform; diff --git a/crates/spfs/src/graph/object.rs b/crates/spfs/src/graph/object.rs index 8789135ca5..58cd4afbcd 100644 --- a/crates/spfs/src/graph/object.rs +++ b/crates/spfs/src/graph/object.rs @@ -10,7 +10,7 @@ use encoding::prelude::*; use serde::{Deserialize, Serialize}; use super::error::{ObjectError, ObjectResult}; -use super::{Blob, DatabaseView, HasKind, Kind, Layer, Manifest, ObjectKind, Platform}; +use super::{Annotation, Blob, DatabaseView, HasKind, Kind, Layer, Manifest, ObjectKind, Platform}; use crate::encoding; use crate::storage::RepositoryHandle; @@ -157,8 +157,14 @@ impl Object { } } Enum::Layer(object) => { - let item = repo.read_object(*object.manifest()).await?; - next_iter_objects.push(item); + if let Some(manifest_digest) = object.manifest() { + let item = repo.read_object(*manifest_digest).await?; + next_iter_objects.push(item); + } + for entry in object.annotations() { + let annotation: Annotation = entry.into(); + total_size += annotation.size(); + } } Enum::Manifest(object) => { for node in object.to_tracking_manifest().walk_abs("/spfs") { diff --git a/crates/spfs/src/proto/conversions.rs b/crates/spfs/src/proto/conversions.rs index c3ec42321b..c75eb8ae8a 100644 --- a/crates/spfs/src/proto/conversions.rs +++ b/crates/spfs/src/proto/conversions.rs @@ -203,8 +203,15 @@ impl TryFrom for graph::Platform { impl From<&graph::Layer> for super::Layer { fn from(source: &graph::Layer) -> Self { + let mut annotations: Vec = Vec::new(); + for a in source.annotations() { + let annotation: graph::Annotation = a.into(); + annotations.push((&annotation).into()); + } + Self { - manifest: Some(source.manifest().into()), + manifest: source.manifest().map(|m| m.into()), + annotations, } } } @@ -212,7 +219,36 @@ impl From<&graph::Layer> for super::Layer { impl TryFrom for graph::Layer { type Error = Error; fn try_from(source: super::Layer) -> Result { - Ok(Self::new(convert_digest(source.manifest)?)) + let digest = Some(convert_digest(source.manifest)?); + + if let Some(manifest_digest) = digest { + if !source.annotations.is_empty() { + let mut annotations: Vec = Vec::new(); + for a in source.annotations.into_iter() { + annotations.push((a.key.clone(), a.value.try_into()?)); + } + // A spfs filesystem layer with some annotations + Ok(Self::new_with_manifest_and_annotations( + manifest_digest, + annotations, + )) + } else { + // A typical filesystem layer + Ok(Self::new(manifest_digest)) + } + } else if !source.annotations.is_empty() { + let mut annotations: Vec = Vec::new(); + for a in source.annotations.into_iter() { + annotations.push((a.key.clone(), a.value.try_into()?)); + } + + // An annotation only layer + Ok(Self::new_with_annotations(annotations)) + } else { + Err(Error::String( + "Creating a graph::Layer requires at least one of: a manifest digest, or an annotation".to_string(), + )) + } } } @@ -295,6 +331,31 @@ impl TryFrom for graph::Manifest { } } +impl From<&graph::Annotation<'_>> for super::Annotation { + fn from(source: &graph::Annotation) -> Self { + use super::annotation::Value; + Self { + key: source.key().to_string(), + value: match source.value() { + graph::AnnotationValue::String(s) => Some(Value::Data(s.clone())), + graph::AnnotationValue::Blob(d) => Some(Value::Digest(d.into())), + }, + } + } +} + +impl TryFrom> for graph::AnnotationValue { + type Error = Error; + fn try_from(source: Option) -> Result { + use super::annotation::Value; + match source { + Some(Value::Data(s)) => Ok(graph::AnnotationValue::String(s.clone())), + Some(Value::Digest(d)) => Ok(graph::AnnotationValue::Blob(convert_digest(Some(d))?)), + None => Ok(graph::AnnotationValue::String(Default::default())), + } + } +} + impl<'buf> From<&graph::Tree<'buf>> for super::Tree { fn from(source: &graph::Tree) -> Self { Self { diff --git a/crates/spfs/src/proto/defs/types.proto b/crates/spfs/src/proto/defs/types.proto index c056310c67..b7498f097d 100644 --- a/crates/spfs/src/proto/defs/types.proto +++ b/crates/spfs/src/proto/defs/types.proto @@ -30,6 +30,7 @@ message Platform { message Layer { Digest manifest = 1; + repeated Annotation annotations = 2; } message Manifest { @@ -37,6 +38,14 @@ message Manifest { repeated Tree trees = 2; } +message Annotation { + string key = 1; + oneof value { + string data = 2; + Digest digest = 3; + } +} + message Tree { repeated Entry entries = 1; } diff --git a/crates/spfs/src/resolve.rs b/crates/spfs/src/resolve.rs index 4088bd41a7..1a578b9f3f 100644 --- a/crates/spfs/src/resolve.rs +++ b/crates/spfs/src/resolve.rs @@ -155,12 +155,14 @@ pub async fn compute_environment_manifest( let layers = resolve_stack_to_layers(&stack, Some(repo)).await?; let mut manifest = tracking::Manifest::default(); for layer in layers { - manifest.update( - &repo - .read_manifest(*layer.manifest()) - .await? - .to_tracking_manifest(), - ) + if let Some(manifest_digest) = layer.manifest() { + manifest.update( + &repo + .read_manifest(*manifest_digest) + .await? + .to_tracking_manifest(), + ) + } } Ok(manifest) } @@ -170,16 +172,24 @@ pub async fn compute_object_manifest( repo: &storage::RepositoryHandle, ) -> Result { match obj.into_enum() { - graph::object::Enum::Layer(obj) => Ok(repo - .read_manifest(*obj.manifest()) - .await? - .to_tracking_manifest()), + graph::object::Enum::Layer(obj) => { + if let Some(manifest_digest) = obj.manifest() { + Ok(repo + .read_manifest(*manifest_digest) + .await? + .to_tracking_manifest()) + } else { + Ok(tracking::Manifest::default()) + } + } graph::object::Enum::Platform(obj) => { let layers = resolve_stack_to_layers(&obj.to_stack(), Some(repo)).await?; let mut manifest = tracking::Manifest::default(); for layer in layers.iter() { - let layer_manifest = repo.read_manifest(*layer.manifest()).await?; - manifest.update(&layer_manifest.to_tracking_manifest()); + if let Some(manifest_digest) = layer.manifest() { + let layer_manifest = repo.read_manifest(*manifest_digest).await?; + manifest.update(&layer_manifest.to_tracking_manifest()); + } } Ok(manifest) } @@ -247,10 +257,12 @@ where let layers = resolve_stack_to_layers_with_repo(&runtime.status.stack, &repo).await?; let mut manifests = Vec::with_capacity(layers.len()); for (index, layer) in layers.iter().enumerate() { - manifests.push(ResolvedManifest::Existing { - order: index, - manifest: repo.read_manifest(*layer.manifest()).await?, - }); + if let Some(manifest_digest) = layer.manifest() { + manifests.push(ResolvedManifest::Existing { + order: index, + manifest: repo.read_manifest(*manifest_digest).await?, + }); + } } // Determine if layers need to be combined to stay within the length limits diff --git a/crates/spfs/src/runtime/mod.rs b/crates/spfs/src/runtime/mod.rs index 071a9e4b10..8c59f71d45 100644 --- a/crates/spfs/src/runtime/mod.rs +++ b/crates/spfs/src/runtime/mod.rs @@ -24,6 +24,7 @@ pub use storage::{ BindMount, Config, Data, + KeyValuePair, LiveLayer, LiveLayerFile, MountBackend, diff --git a/crates/spfs/src/runtime/storage.rs b/crates/spfs/src/runtime/storage.rs index 1a2b65b1e8..ecfffd7a67 100644 --- a/crates/spfs/src/runtime/storage.rs +++ b/crates/spfs/src/runtime/storage.rs @@ -4,7 +4,7 @@ //! Definition and persistent storage of runtimes. -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::env::temp_dir; use std::fmt::Display; use std::fs::OpenOptions; @@ -27,11 +27,14 @@ use tokio::io::AsyncReadExt; use super::startup_ps; #[cfg(unix)] use super::{startup_csh, startup_sh}; +use crate::encoding::Digest; use crate::env::SPFS_DIR_PREFIX; +use crate::graph::object::Enum; +use crate::graph::{Annotation, AnnotationValue, KeyAnnotationValuePair}; use crate::prelude::*; use crate::storage::fs::DURABLE_EDITS_DIR; use crate::storage::RepositoryHandle; -use crate::{bootstrap, encoding, graph, storage, tracking, Error, Result}; +use crate::{bootstrap, graph, storage, tracking, Error, Result}; #[cfg(test)] #[path = "./storage_test.rs"] @@ -51,6 +54,9 @@ const TRANSIENT: bool = false; const LIVE_LAYER_FILE_SUFFIX_YAML: &str = ".spfs.yaml"; const DEFAULT_LIVE_LAYER_FILENAME: &str = "layer.spfs.yaml"; +/// Data type for pairs of annotation keys and values +pub type KeyValuePair = (String, String); + /// Information about the source of a runtime #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] pub struct Author { @@ -79,7 +85,7 @@ pub struct Status { /// Additional layers that were created automatically due to the stack /// being too large. #[serde(default, skip_serializing_if = "HashSet::is_empty")] - pub(crate) flattened_layers: HashSet, + pub(crate) flattened_layers: HashSet, /// Whether or not this runtime is editable /// /// An editable runtime is mounted with working directories @@ -686,6 +692,97 @@ impl Runtime { self.data.is_durable() } + /// Store a list of arbitrary key-value string pairs in the runtime + pub async fn add_annotations( + &mut self, + data: Vec, + size_limit: usize, + ) -> Result<()> { + tracing::debug!("adding list of {} Annotations [{}]", data.len(), size_limit); + + let mut annotations: Vec = Vec::with_capacity(data.len()); + for (key, value) in data { + let annotation_value = if value.len() <= size_limit { + AnnotationValue::String(value) + } else { + let digest = self.storage.create_blob_for_string(value).await?; + tracing::debug!("annotation too large for Layer, created Blob for it: {digest}"); + AnnotationValue::Blob(digest) + }; + + annotations.push((key, annotation_value)); + } + + let layer = graph::Layer::new_with_annotations(annotations); + self.storage.inner.write_object(&layer).await?; + + // The new annotation is added to the bottom of the runtime's stack + let layer_digest = layer.digest()?; + self.push_digest(layer_digest); + + tracing::debug!("pushed layer with list of annotions to storage: {layer_digest}."); + Ok(()) + } + + /// Store an arbitrary key-value string pair in the runtime + pub async fn add_annotation( + &mut self, + key: String, + value: String, + size_limit: usize, + ) -> Result<()> { + tracing::debug!( + "adding Annotation: key: {} => value: {} [len: {} > {}]", + key, + value, + value.len(), + size_limit + ); + + let annotation_value = if value.len() <= size_limit { + AnnotationValue::String(value) + } else { + let digest = self.storage.create_blob_for_string(value).await?; + tracing::debug!("annotation too large for Layer, created Blob for it: {digest}"); + AnnotationValue::Blob(digest) + }; + + let layer = graph::Layer::new_with_annotation(key, annotation_value); + self.storage.inner.write_object(&layer).await?; + + // The new annotation is added to the bottom of the runtime's stack + let layer_digest = layer.digest()?; + self.push_digest(layer_digest); + + tracing::debug!("pushed layer with annotion to storage: {layer_digest}."); + Ok(()) + } + + /// Return the string value stored as annotation under the given key. + pub async fn annotation(&self, key: &str) -> Result> { + for digest in self.status.stack.iter_bottom_up() { + if let Some(s) = self.storage.find_annotation(&digest, key).await? { + return Ok(Some(s)); + } + } + Ok(None) + } + + /// Return all the string values that are stored as annotation under any key. + pub async fn all_annotations(&self) -> Result> { + let mut data: BTreeMap = BTreeMap::new(); + for digest in self.status.stack.iter_bottom_up() { + let pairs = self + .storage + .find_annotation_key_value_pairs(&digest) + .await?; + for (key, value) in pairs { + data.insert(key, value); + } + } + Ok(data) + } + /// Reset parts of the runtime's state so it can be reused in /// another process run. pub async fn reinit_for_reuse_and_save_to_storage(&mut self) -> Result<()> { @@ -874,7 +971,7 @@ impl Runtime { /// and change the overlayfs options, but not save the runtime or /// update any currently running environment. Returns false /// if adding the digest had no change to the runtime stack. - pub fn push_digest(&mut self, digest: encoding::Digest) -> bool { + pub fn push_digest(&mut self, digest: Digest) -> bool { self.status.stack.push(digest) } @@ -1150,6 +1247,123 @@ impl Storage { OwnedRuntime::upgrade_as_owner(rt).await } + /// Create a new blob payload to hold the given string value + pub(crate) async fn create_blob_for_string(&self, payload: String) -> Result { + self.inner + .commit_blob(Box::pin(std::io::Cursor::new(payload.into_bytes()))) + .await + } + + /// Returns the value from the first annotation object matching + /// the given key that can be found starting from the given + /// digest's spfs object. + pub(crate) async fn find_annotation( + &self, + digest: &Digest, + key: &str, + ) -> Result> { + let mut digests_to_process: Vec = vec![*digest]; + + while !digests_to_process.is_empty() { + let mut next_iter_digests: Vec = Vec::new(); + for digest in digests_to_process.iter() { + match self + .inner + .read_ref(digest.to_string().as_str()) + .await? + .into_enum() + { + Enum::Platform(platform) => { + for reference in platform.iter_bottom_up() { + next_iter_digests.push(*reference); + } + } + Enum::Layer(layer) => { + for entry in layer.annotations() { + let annotation: Annotation = entry.into(); + if annotation.key() == key { + let value = self.find_annotation_value(&annotation).await?; + return Ok(Some(value)); + } + } + } + _ => { + // None of the other objects contain Annotation + } + } + } + digests_to_process = std::mem::take(&mut next_iter_digests); + } + Ok(None) + } + + /// Returns all key-value pairs from the annotation object that + /// can be found starting from the given digest's spfs object. + pub(crate) async fn find_annotation_key_value_pairs( + &self, + digest: &Digest, + ) -> Result> { + let mut key_value_pairs: Vec = Vec::new(); + + let mut digests_to_process: Vec = vec![*digest]; + while !digests_to_process.is_empty() { + let mut next_iter_digests: Vec = Vec::new(); + for digest in digests_to_process.iter() { + match self + .inner + .read_ref(digest.to_string().as_str()) + .await? + .into_enum() + { + Enum::Platform(platform) => { + for reference in platform.iter_bottom_up() { + next_iter_digests.push(*reference); + } + } + Enum::Layer(layer) => { + for entry in layer.annotations() { + let annotation: Annotation = entry.into(); + let key = annotation.key().to_string(); + let value = self.find_annotation_value(&annotation).await?; + key_value_pairs.push((key, value)); + } + } + _ => { + // None of the other objects could contain + // pieces of Annotation + } + } + } + digests_to_process = std::mem::take(&mut next_iter_digests); + } + Ok(key_value_pairs) + } + + /// Return the value, as a string, from the given annotation, + /// loading the value from the blob referenced by the digest if + /// the value is stored in an external blob. + async fn find_annotation_value(&self, annotation: &Annotation<'_>) -> Result { + let data = match annotation.value() { + AnnotationValue::String(s) => s.clone(), + AnnotationValue::Blob(digest) => { + let blob = self.inner.read_blob(digest).await?; + let (mut payload, filename) = self.inner.open_payload(*blob.digest()).await?; + let mut writer: Vec = vec![]; + tokio::io::copy(&mut payload, &mut writer) + .await + .map_err(|err| { + Error::StorageReadError( + "copy of annotation payload to string buffer", + filename, + err, + ) + })?; + String::from_utf8(writer).map_err(|err| Error::String(err.to_string()))? + } + }; + Ok(data) + } + pub async fn durable_path(&self, name: String) -> Result { match &*self.inner { RepositoryHandle::FS(repo) => { diff --git a/crates/spfs/src/runtime/storage_test.rs b/crates/spfs/src/runtime/storage_test.rs index a4a54c3e4b..fabca9cd32 100644 --- a/crates/spfs/src/runtime/storage_test.rs +++ b/crates/spfs/src/runtime/storage_test.rs @@ -10,12 +10,16 @@ use std::str::FromStr; use futures::TryStreamExt; use rstest::rstest; +use spfs_encoding::Digestible; use super::{makedirs_with_perms, Data, Storage}; use crate::encoding; use crate::fixtures::*; +use crate::graph::object::EncodingFormat; +use crate::graph::{AnnotationValue, Layer, Platform}; use crate::runtime::storage::{LiveLayerApiVersion, LiveLayerContents}; -use crate::runtime::{BindMount, LiveLayer, LiveLayerFile}; +use crate::runtime::{BindMount, KeyValuePair, LiveLayer, LiveLayerFile}; +use crate::storage::prelude::Database; #[rstest] fn test_bindmount_creation() { @@ -172,6 +176,287 @@ async fn test_storage_create_runtime(tmpdir: tempfile::TempDir) { .is_err()); } +#[rstest] +#[tokio::test] +async fn test_storage_runtime_with_annotation(tmpdir: tempfile::TempDir) { + let root = tmpdir.path().to_string_lossy().to_string(); + let repo = crate::storage::RepositoryHandle::from( + crate::storage::fs::FsRepository::create(root) + .await + .unwrap(), + ); + let storage = Storage::new(repo).unwrap(); + let limit: usize = 16 * 1024; + + let keep_runtime = false; + let live_layers = Vec::new(); + let mut runtime = storage + .create_named_runtime("test-with-annotation-data", keep_runtime, live_layers) + .await + .expect("failed to create runtime in storage"); + + // Test - insert data + let key = "somefield".to_string(); + let value = "some value".to_string(); + assert!(runtime + .add_annotation(key.clone(), value.clone(), limit) + .await + .is_ok()); + + // Test - insert some more data + let value2 = "someothervalue".to_string(); + assert!(runtime + .add_annotation(key.clone(), value2, limit) + .await + .is_ok()); + + // Test - retrieve data - the first inserted data should be the + // what is retrieved because of how adding to the runtime stack + // works. + if EncodingFormat::default() == EncodingFormat::Legacy { + if (runtime.annotation(&key).await).is_ok() { + panic!("This should fail when EncodingFormat::Legacy is the default") + } + // Don't run the rest of the test when EncodingFormat::Legacy is used + return; + }; + + let result = runtime.annotation(&key).await.unwrap(); + assert!(result.is_some()); + + assert!(value == *result.unwrap()); +} + +#[rstest] +#[tokio::test] +async fn test_storage_runtime_add_annotations_list(tmpdir: tempfile::TempDir) { + let root = tmpdir.path().to_string_lossy().to_string(); + let repo = crate::storage::RepositoryHandle::from( + crate::storage::fs::FsRepository::create(root) + .await + .unwrap(), + ); + let storage = Storage::new(repo).unwrap(); + let limit: usize = 16 * 1024; + + let keep_runtime = false; + let live_layers = Vec::new(); + let mut runtime = storage + .create_named_runtime("test-with-annotation-data", keep_runtime, live_layers) + .await + .expect("failed to create runtime in storage"); + + // Test - insert data + let key = "somefield".to_string(); + let value = "some value".to_string(); + let key2 = "someotherfield".to_string(); + let value2 = "some other value".to_string(); + + let annotations: Vec = + vec![(key.clone(), value.clone()), (key2.clone(), value2.clone())]; + + assert!(runtime.add_annotations(annotations, limit).await.is_ok()); + + // Test - retrieve data both pieces of data + let result = runtime.annotation(&key).await.unwrap(); + if EncodingFormat::default() == EncodingFormat::Legacy { + assert!( + result.is_none(), + "No annotation should be found under Legacy encoding" + ); + } else { + assert!(result.is_some()); + assert!(value == *result.unwrap()); + } + + let result2 = runtime.annotation(&key2).await.unwrap(); + if EncodingFormat::default() == EncodingFormat::Legacy { + assert!( + result2.is_none(), + "No annotation should be found under Legacy encoding" + ); + } else { + assert!(result2.is_some()); + assert!(value2 == *result2.unwrap()); + } +} + +#[rstest] +#[tokio::test] +async fn test_storage_runtime_with_nested_annotation(tmpdir: tempfile::TempDir) { + // Setup the objects needed for the runtime used in the test + let root = tmpdir.path().to_string_lossy().to_string(); + let repo = crate::storage::RepositoryHandle::from( + crate::storage::fs::FsRepository::create(root) + .await + .unwrap(), + ); + + // make an annotation layer + let key = "somefield".to_string(); + let value = "somevalue".to_string(); + let annotation_value = AnnotationValue::String(value.clone()); + let layer = Layer::new_with_annotation(key.clone(), annotation_value); + repo.write_object(&layer).await.unwrap(); + + // make a platform that contains the annotation layer + let layers: Vec = vec![layer.digest().unwrap()]; + let platform = Platform::from_iter(layers); + repo.write_object(&platform.clone()).await.unwrap(); + + // put the platform into a runtime + let storage = Storage::new(repo).unwrap(); + let keep_runtime = false; + let live_layers = Vec::new(); + let mut runtime = storage + .create_named_runtime("test-with-annotation-nested", keep_runtime, live_layers) + .await + .expect("failed to create runtime in storage"); + runtime.push_digest(platform.digest().unwrap()); + + if EncodingFormat::default() == EncodingFormat::Legacy { + if (runtime.annotation(&key).await).is_ok() { + panic!("This should fail when EncodingFormat::Legacy is the default") + } + // Don't run the rest of the test when EncodingFormat::Legacy is used + return; + }; + + // Test - retrieve the data even though it is nested inside a + // platform object in the runtime. + let result = runtime.annotation(&key).await.unwrap(); + assert!(result.is_some()); + + assert!(value == *result.unwrap()); +} + +#[rstest] +#[tokio::test] +async fn test_storage_runtime_with_annotation_all(tmpdir: tempfile::TempDir) { + let root = tmpdir.path().to_string_lossy().to_string(); + let repo = crate::storage::RepositoryHandle::from( + crate::storage::fs::FsRepository::create(root) + .await + .unwrap(), + ); + let storage = Storage::new(repo).unwrap(); + let limit: usize = 16 * 1024; + + let keep_runtime = false; + let live_layers = Vec::new(); + let mut runtime = storage + .create_named_runtime("test-with-annotation-all", keep_runtime, live_layers) + .await + .expect("failed to create runtime in storage"); + + // Test - insert two distinct data values + let key = "somefield".to_string(); + let value = "somevalue".to_string(); + + assert!(runtime + .add_annotation(key.clone(), value.clone(), limit) + .await + .is_ok()); + + let key2 = "somefield2".to_string(); + let value2 = "somevalue2".to_string(); + assert!(runtime + .add_annotation(key2.clone(), value2.clone(), limit) + .await + .is_ok()); + + // Test - get all the data back out + if EncodingFormat::default() == EncodingFormat::Legacy { + if (runtime.all_annotations().await).is_ok() { + panic!("This should fail when EncodingFormat::Legacy is the default") + } + // Don't run the rest of the test when EncodingFormat::Legacy is used + return; + }; + + let result = runtime.all_annotations().await.unwrap(); + + assert!(result.len() == 2); + for (expected_key, expected_value) in [(key, value), (key2, value2)].iter() { + assert!(result.get(expected_key).is_some()); + match result.get(expected_key) { + Some(v) => { + assert!(v == expected_value); + } + None => panic!("Value missing for {expected_key} when getting all annotation"), + } + } +} + +#[rstest] +#[tokio::test] +async fn test_storage_runtime_with_nested_annotation_all(tmpdir: tempfile::TempDir) { + // setup the objects needed for the runtime used in the test + let root = tmpdir.path().to_string_lossy().to_string(); + let repo = crate::storage::RepositoryHandle::from( + crate::storage::fs::FsRepository::create(root) + .await + .unwrap(), + ); + + // make two distinct data values + let key = "somefield".to_string(); + let value = "somevalue".to_string(); + let annotation_value = AnnotationValue::String(value.clone()); + let layer = Layer::new_with_annotation(key.clone(), annotation_value); + repo.write_object(&layer.clone()).await.unwrap(); + + let key2 = "somefield2".to_string(); + let value2 = "somevalue2".to_string(); + let annotation_value2 = AnnotationValue::String(value2.clone()); + let layer2 = Layer::new_with_annotation(key2.clone(), annotation_value2); + repo.write_object(&layer2.clone()).await.unwrap(); + + // make a platform with one annotation layer + let layers: Vec = vec![layer.digest().unwrap()]; + let platform = Platform::from_iter(layers); + repo.write_object(&platform.clone()).await.unwrap(); + + // make another platform with the first platform and the other + // annotation layer. this second platform is in the runtime + let layers2: Vec = vec![platform.digest().unwrap(), layer2.digest().unwrap()]; + let platform2 = Platform::from_iter(layers2); + repo.write_object(&platform2.clone()).await.unwrap(); + + // finally set up the runtime + let storage = Storage::new(repo).unwrap(); + + let keep_runtime = false; + let live_layers = Vec::new(); + let mut runtime = storage + .create_named_runtime("test-with-annotation-all-nested", keep_runtime, live_layers) + .await + .expect("failed to create runtime in storage"); + runtime.push_digest(platform2.digest().unwrap()); + + // Test - get all the data back out even thought it is nested at + // different levels in different platform objects in the runtime + if EncodingFormat::default() == EncodingFormat::Legacy { + if (runtime.all_annotations().await).is_ok() { + panic!("This should fail when EncodingFormat::Legacy is the default") + } + // Don't run the rest of the test when EncodingFormat::Legacy is used + return; + }; + + let result = runtime.all_annotations().await.unwrap(); + assert!(result.len() == 2); + for (expected_key, expected_value) in [(key, value), (key2, value2)].iter() { + assert!(result.get(expected_key).is_some()); + match result.get(expected_key) { + Some(v) => { + assert!(v == expected_value); + } + None => panic!("Value missing for {expected_key} when getting all annotations"), + } + } +} + #[rstest] #[tokio::test] async fn test_storage_remove_runtime(tmpdir: tempfile::TempDir) { diff --git a/crates/spfs/src/storage/fs/renderer_unix.rs b/crates/spfs/src/storage/fs/renderer_unix.rs index a8d7d8fef1..58718ffb6a 100644 --- a/crates/spfs/src/storage/fs/renderer_unix.rs +++ b/crates/spfs/src/storage/fs/renderer_unix.rs @@ -261,17 +261,19 @@ where .map_err(|err| err.wrap("resolve stack to layers"))?; let mut futures = futures::stream::FuturesOrdered::new(); for layer in layers { - let digest = *layer.manifest(); - let fut = self - .repo - .read_manifest(digest) - .map_err(move |err| err.wrap(format!("read manifest {digest}"))) - .and_then(move |manifest| async move { - self.render_manifest(&manifest, render_type) - .await - .map_err(move |err| err.wrap(format!("render manifest {digest}"))) - }); - futures.push_back(fut); + if let Some(manifest_digest) = layer.manifest() { + let digest = *manifest_digest; + let fut = self + .repo + .read_manifest(digest) + .map_err(move |err| err.wrap(format!("read manifest {digest}"))) + .and_then(move |manifest| async move { + self.render_manifest(&manifest, render_type) + .await + .map_err(move |err| err.wrap(format!("render manifest {digest}"))) + }); + futures.push_back(fut); + } } futures.try_collect().await } @@ -293,7 +295,9 @@ where let layers = crate::resolve::resolve_stack_to_layers_with_repo(&stack, self.repo).await?; let mut manifests = Vec::with_capacity(layers.len()); for layer in layers { - manifests.push(self.repo.read_manifest(*layer.manifest()).await?); + if let Some(manifest_digest) = layer.manifest() { + manifests.push(self.repo.read_manifest(*manifest_digest).await?); + } } let mut manifest = tracking::Manifest::default(); for next in manifests.into_iter() { diff --git a/crates/spfs/src/storage/fs/renderer_win.rs b/crates/spfs/src/storage/fs/renderer_win.rs index b0ef948a3e..3e453025a3 100644 --- a/crates/spfs/src/storage/fs/renderer_win.rs +++ b/crates/spfs/src/storage/fs/renderer_win.rs @@ -240,17 +240,19 @@ where .map_err(|err| err.wrap("resolve stack to layers"))?; let mut futures = futures::stream::FuturesOrdered::new(); for layer in layers { - let digest = *layer.manifest(); - let fut = self - .repo - .read_manifest(digest) - .map_err(move |err| err.wrap(format!("read manifest {digest}"))) - .and_then(move |manifest| async move { - self.render_manifest(&manifest, render_type) - .await - .map_err(move |err| err.wrap(format!("render manifest {digest}"))) - }); - futures.push_back(fut); + if let Some(manifest_digest) = layer.manifest() { + let digest = *manifest_digest; + let fut = self + .repo + .read_manifest(digest) + .map_err(move |err| err.wrap(format!("read manifest {digest}"))) + .and_then(move |manifest| async move { + self.render_manifest(&manifest, render_type) + .await + .map_err(move |err| err.wrap(format!("render manifest {digest}"))) + }); + futures.push_back(fut); + } } futures.try_collect().await } @@ -272,7 +274,9 @@ where let layers = crate::resolve::resolve_stack_to_layers_with_repo(&stack, self.repo).await?; let mut manifests = Vec::with_capacity(layers.len()); for layer in layers { - manifests.push(self.repo.read_manifest(*layer.manifest()).await?); + if let Some(manifest_digest) = layer.manifest() { + manifests.push(self.repo.read_manifest(*manifest_digest).await?); + } } let mut manifest = tracking::Manifest::default(); for next in manifests.into_iter() { diff --git a/crates/spfs/src/sync.rs b/crates/spfs/src/sync.rs index f645481d73..3f0bda77ff 100644 --- a/crates/spfs/src/sync.rs +++ b/crates/spfs/src/sync.rs @@ -9,6 +9,7 @@ use once_cell::sync::OnceCell; use progress_bar_derive_macro::ProgressBar; use tokio::sync::Semaphore; +use crate::graph::AnnotationValue; use crate::prelude::*; use crate::{encoding, graph, storage, tracking, Error, Result}; @@ -316,10 +317,34 @@ where } self.reporter.visit_layer(&layer); - let manifest = self.src.read_manifest(*layer.manifest()).await?; - let result = self.sync_manifest(manifest).await?; + + let manifest_result = if let Some(manifest_digest) = layer.manifest() { + let manifest = self.src.read_manifest(*manifest_digest).await?; + self.sync_manifest(manifest).await? + } else { + SyncManifestResult::Skipped + }; + let annotations = layer.annotations(); + let annotation_results = if annotations.is_empty() { + vec![SyncObjectResult::Annotation( + SyncAnnotationResult::InternalValue, + )] + } else { + let mut results = Vec::with_capacity(annotations.len()); + for entry in annotations { + results.push(SyncObjectResult::Annotation( + self.sync_annotation(entry.into()).await?, + )); + } + results + }; + self.dest.write_object(&layer).await?; - let res = SyncLayerResult::Synced { layer, result }; + + let mut results = vec![SyncObjectResult::Manifest(manifest_result)]; + results.extend(annotation_results); + + let res = SyncLayerResult::Synced { layer, results }; self.reporter.synced_layer(&res); Ok(res) } @@ -360,6 +385,28 @@ where Ok(res) } + async fn sync_annotation( + &self, + annotation: graph::Annotation<'_>, + ) -> Result { + match annotation.value() { + AnnotationValue::String(_) => Ok(SyncAnnotationResult::InternalValue), + AnnotationValue::Blob(digest) => { + if !self.processed_digests.insert(digest) { + return Ok(SyncAnnotationResult::Duplicate); + } + self.reporter.visit_annotation(&annotation); + let sync_result = self.sync_digest(digest).await?; + let res = SyncAnnotationResult::Synced { + digest, + result: Box::new(sync_result), + }; + self.reporter.synced_annotation(&res); + Ok(res) + } + } + } + async fn sync_entry(&self, entry: graph::Entry<'_>) -> Result { if !entry.kind().is_blob() { return Ok(SyncEntryResult::Skipped); @@ -534,6 +581,12 @@ pub trait SyncReporter: Send + Sync { /// Called when a manifest has finished syncing fn synced_manifest(&self, _result: &SyncManifestResult) {} + /// Called when an annotation has been identified to sync + fn visit_annotation(&self, _annotation: &graph::Annotation) {} + + /// Called when an annotation has finished syncing + fn synced_annotation(&self, _result: &SyncAnnotationResult) {} + /// Called when an entry has been identified to sync fn visit_entry(&self, _entry: &graph::Entry<'_>) {} @@ -743,6 +796,7 @@ pub enum SyncObjectResult { Layer(SyncLayerResult), Blob(SyncBlobResult), Manifest(SyncManifestResult), + Annotation(SyncAnnotationResult), } impl SyncObjectResult { @@ -758,6 +812,7 @@ impl SyncObjectResult { R::Layer(res) => res.summary(), R::Blob(res) => res.summary(), R::Manifest(res) => res.summary(), + R::Annotation(res) => res.summary(), } } } @@ -797,7 +852,7 @@ pub enum SyncLayerResult { /// The layer was synced Synced { layer: graph::Layer, - result: SyncManifestResult, + results: Vec, }, } @@ -805,8 +860,8 @@ impl SyncLayerResult { pub fn summary(&self) -> SyncSummary { match self { Self::Skipped | Self::Duplicate => SyncSummary::skipped_one_object(), - Self::Synced { result, .. } => { - let mut summary = result.summary(); + Self::Synced { results, .. } => { + let mut summary = results.iter().map(|r| r.summary()).sum(); summary += SyncSummary::synced_one_object(); summary } @@ -840,6 +895,31 @@ impl SyncManifestResult { } } +#[derive(Debug)] +pub enum SyncAnnotationResult { + /// The annotation did not need to be synced + InternalValue, + /// The annotation was already synced in this session + Duplicate, + /// The annotation was stored in a blob and was synced + Synced { + digest: encoding::Digest, + result: Box, + }, +} + +impl SyncAnnotationResult { + pub fn summary(&self) -> SyncSummary { + match self { + Self::InternalValue | Self::Duplicate => SyncSummary::default(), + Self::Synced { + digest: _, + result: _, + } => SyncSummary::synced_one_object(), + } + } +} + #[derive(Debug)] pub enum SyncEntryResult { /// The entry did not need to be synced diff --git a/crates/spk-build/src/build/binary.rs b/crates/spk-build/src/build/binary.rs index bbd8e824c5..71214d581b 100644 --- a/crates/spk-build/src/build/binary.rs +++ b/crates/spk-build/src/build/binary.rs @@ -843,8 +843,16 @@ where .with_path_filter(collected_changes.as_slice()) .commit_layer(&mut runtime) .await?; + + let manifest_digest = match layer.manifest() { + Some(d) => d, + None => { + return Err(Error::String("Collected changes became a layer with no manifest. This should not happen during a binary build. Please report this as a bug".to_string())); + } + }; + let collected_layer = repo - .read_manifest(*layer.manifest()) + .read_manifest(*manifest_digest) .await? .to_tracking_manifest(); let manifests = split_manifest_by_component( diff --git a/crates/spk-build/src/build/binary_test.rs b/crates/spk-build/src/build/binary_test.rs index 7c6401b5e5..dbe1bdde08 100644 --- a/crates/spk-build/src/build/binary_test.rs +++ b/crates/spk-build/src/build/binary_test.rs @@ -597,11 +597,17 @@ async fn test_build_package_source_cleanup() { let config = spfs::get_config().unwrap(); let repo = config.get_local_repository().await.unwrap(); let layer = repo.read_layer(digest).await.unwrap(); + + let manifest_digest = match layer.manifest() { + None => panic!("This layer should have a manifest digest!"), + Some(d) => d, + }; let manifest = repo - .read_manifest(*layer.manifest()) + .read_manifest(*manifest_digest) .await .unwrap() .to_tracking_manifest(); + let entry = manifest.get_path(data_path(src_pkg.ident())); assert!( entry.is_none() || entry.unwrap().entries.is_empty(), @@ -687,8 +693,13 @@ async fn test_build_filters_reset_files() { let config = spfs::get_config().unwrap(); let repo = config.get_local_repository().await.unwrap(); let layer = repo.read_layer(digest).await.unwrap(); + + let manifest_digest = match layer.manifest() { + None => panic!("This layer should have a manifest digest!"), + Some(d) => d, + }; let manifest = repo - .read_manifest(*layer.manifest()) + .read_manifest(*manifest_digest) .await .unwrap() .to_tracking_manifest(); diff --git a/crates/spk-cli/cmd-build/src/cmd_build_test/mod.rs b/crates/spk-cli/cmd-build/src/cmd_build_test/mod.rs index b9b65b01e1..abeb56cc46 100644 --- a/crates/spk-cli/cmd-build/src/cmd_build_test/mod.rs +++ b/crates/spk-cli/cmd-build/src/cmd_build_test/mod.rs @@ -474,7 +474,11 @@ build: let layer = repo.read_layer(digest).await.unwrap(); let manifest = repo - .read_manifest(*layer.manifest()) + .read_manifest( + *layer + .manifest() + .expect("Layer should have a manifest in this test"), + ) .await .unwrap() .to_tracking_manifest(); @@ -574,7 +578,11 @@ build: let layer = repo.read_layer(digest).await.unwrap(); let manifest = repo - .read_manifest(*layer.manifest()) + .read_manifest( + *layer + .manifest() + .expect("Layer should have a manifest in this test"), + ) .await .unwrap() .to_tracking_manifest(); diff --git a/crates/spk-cli/cmd-du/src/cmd_du.rs b/crates/spk-cli/cmd-du/src/cmd_du.rs index c5b0962921..a31a27b9e5 100644 --- a/crates/spk-cli/cmd-du/src/cmd_du.rs +++ b/crates/spk-cli/cmd-du/src/cmd_du.rs @@ -354,8 +354,14 @@ impl Du { } } Enum::Layer(object) => { - item = repo.read_object(*object.manifest()).await?; + let manifest_digest = match object.manifest() { + None => continue, + Some(d) => d, + }; + item = repo.read_object(*manifest_digest).await?; next_iter_objects.push(item); + // TODO: what about annotation data stored in a blob, + // that kind of data isn't counted here? } Enum::Manifest(object) => { let tracking_manifest = object.to_tracking_manifest(); diff --git a/crates/spk-exec/src/exec.rs b/crates/spk-exec/src/exec.rs index 2592f64f05..1cb535088d 100644 --- a/crates/spk-exec/src/exec.rs +++ b/crates/spk-exec/src/exec.rs @@ -48,7 +48,11 @@ impl ResolvedLayers { let object = repo.read_object(resolved_layer.digest).await?; match object.into_enum() { Enum::Layer(obj) => { - match repo.read_object(*obj.manifest()).await?.into_enum() { + let manifest_digest = match obj.manifest() { + None => continue, + Some(m) => m + }; + match repo.read_object(*manifest_digest).await?.into_enum() { Enum::Manifest(obj) => obj, _ => continue, } diff --git a/crates/spk-storage/src/fixtures.rs b/crates/spk-storage/src/fixtures.rs index ec84068f10..0fd11c3649 100644 --- a/crates/spk-storage/src/fixtures.rs +++ b/crates/spk-storage/src/fixtures.rs @@ -75,7 +75,7 @@ impl std::ops::Deref for TempRepo { /// Returns an empty spfs layer object for easy testing pub fn empty_layer() -> spfs::graph::Layer { - spfs::graph::Layer::new(spfs::encoding::EMPTY_DIGEST.into()) + spfs::graph::Layer::new(Default::default()) } /// Returns the digest for an empty spfs layer. diff --git a/crates/spk-storage/src/storage/runtime.rs b/crates/spk-storage/src/storage/runtime.rs index f6f4987a51..8b8b44fd8e 100644 --- a/crates/spk-storage/src/storage/runtime.rs +++ b/crates/spk-storage/src/storage/runtime.rs @@ -490,12 +490,14 @@ async fn find_layer_by_filename>(path: S) -> Result>( let layers = spfs::resolve_stack_to_layers(&runtime.status.stack, Some(&repo)).await?; for layer in layers.iter().rev() { + let manifest_digest = match layer.manifest() { + None => continue, + Some(d) => d, + }; + let manifest = repo - .read_manifest(*layer.manifest()) + .read_manifest(*manifest_digest) .await? .to_tracking_manifest();