From 88d392089f94a4462edc1e997ae7598299586ec3 Mon Sep 17 00:00:00 2001 From: David Gilligan-Cook Date: Mon, 24 Jul 2023 11:13:53 -0700 Subject: [PATCH] Adds external key-value string data storage to runtime via ExternalData objects in Layers. Adds --external-data argument to spfs run/shell, --get and --get-all arguments to spfs info, spfs runtime info, and adds support for reading the value from an ExternalData using spfs read. Signed-off-by: David Gilligan-Cook --- Cargo.lock | 4 + crates/spfs-cli/cmd-enter/src/cmd_enter.rs | 9 + crates/spfs-cli/cmd-render/src/cmd_render.rs | 9 +- crates/spfs-cli/common/Cargo.toml | 1 + crates/spfs-cli/common/src/args.rs | 59 ++++- crates/spfs-cli/common/src/lib.rs | 2 +- crates/spfs-cli/main/Cargo.toml | 5 + crates/spfs-cli/main/src/cmd_info.rs | 22 +- crates/spfs-cli/main/src/cmd_run.rs | 122 ++++++++- crates/spfs-cli/main/src/cmd_run_test.rs | 198 ++++++++++++++ crates/spfs-cli/main/src/cmd_runtime_info.rs | 10 + crates/spfs-cli/main/src/cmd_shell.rs | 5 + crates/spfs-cli/main/src/fixtures.rs | 10 + crates/spfs-proto/schema/spfs.fbs | 23 +- crates/spfs/src/check.rs | 73 +++++- crates/spfs/src/config.rs | 15 ++ crates/spfs/src/find_path.rs | 21 +- crates/spfs/src/graph/external_data.rs | 182 +++++++++++++ crates/spfs/src/graph/external_data_test.rs | 28 ++ crates/spfs/src/graph/layer.rs | 131 +++++++++- crates/spfs/src/graph/mod.rs | 7 + crates/spfs/src/graph/object.rs | 22 +- crates/spfs/src/proto/conversions.rs | 67 ++++- crates/spfs/src/proto/defs/types.proto | 15 ++ crates/spfs/src/resolve.rs | 44 ++-- crates/spfs/src/runtime/mod.rs | 1 + crates/spfs/src/runtime/storage.rs | 211 ++++++++++++++- crates/spfs/src/runtime/storage_test.rs | 246 ++++++++++++++++++ crates/spfs/src/storage/fs/renderer_unix.rs | 28 +- crates/spfs/src/sync.rs | 88 ++++++- crates/spk-build/src/build/binary.rs | 10 +- crates/spk-build/src/build/binary_test.rs | 15 +- .../spk-cli/cmd-build/src/cmd_build_test.rs | 12 +- crates/spk-cli/cmd-du/src/cmd_du.rs | 8 +- crates/spk-exec/src/exec.rs | 6 +- crates/spk-storage/src/fixtures.rs | 2 +- crates/spk-storage/src/storage/archive.rs | 8 - crates/spk-storage/src/storage/runtime.rs | 21 +- 38 files changed, 1643 insertions(+), 97 deletions(-) create mode 100644 crates/spfs-cli/main/src/cmd_run_test.rs create mode 100644 crates/spfs-cli/main/src/fixtures.rs create mode 100644 crates/spfs/src/graph/external_data.rs create mode 100644 crates/spfs/src/graph/external_data_test.rs diff --git a/Cargo.lock b/Cargo.lock index 9f7cf6091e..8053c4d1f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3459,6 +3459,7 @@ dependencies = [ "sentry-miette", "sentry-tracing", "serde_json", + "serde_yaml 0.9.27", "spfs", "strip-ansi-escapes", "syslog-tracing", @@ -3530,10 +3531,13 @@ dependencies = [ "number_prefix", "procfs", "relative-path", + "rstest 0.18.2", "serde_json", + "serde_yaml 0.9.27", "spfs", "spfs-cli-common", "strum", + "tempfile", "tokio", "tokio-stream", "tonic", diff --git a/crates/spfs-cli/cmd-enter/src/cmd_enter.rs b/crates/spfs-cli/cmd-enter/src/cmd_enter.rs index a278cda8ff..58deabd5fd 100644 --- a/crates/spfs-cli/cmd-enter/src/cmd_enter.rs +++ b/crates/spfs-cli/cmd-enter/src/cmd_enter.rs @@ -175,6 +175,15 @@ impl CmdEnter { // For example, the monitor may need to run fusermount to clean up // fuse filesystems within the runtime before shutting down. tracing::debug!("initializing runtime {owned:#?}"); + tracing::debug!( + "runtime stack: {:?}", + owned + .status + .stack + .iter_bottom_up() + .map(|d| d.to_string()) + .collect::>() + ); let start_time = Instant::now(); let render_summary = spfs::initialize_runtime(&mut owned).await?; self.report_render_summary(render_summary, start_time.elapsed().as_secs_f64()); diff --git a/crates/spfs-cli/cmd-render/src/cmd_render.rs b/crates/spfs-cli/cmd-render/src/cmd_render.rs index 51e97c69c5..a75672d8be 100644 --- a/crates/spfs-cli/cmd-render/src/cmd_render.rs +++ b/crates/spfs-cli/cmd-render/src/cmd_render.rs @@ -156,7 +156,14 @@ impl CmdRender { &render_summary_reporter as &dyn spfs::storage::fs::RenderReporter, ]), ); - let stack = layers.into_iter().map(|l| *l.manifest()).collect(); + let stack = layers + .into_iter() + .filter_map(|l| match l.manifest() { + None => None, + Some(m) => Some(*m), + }) + .collect(); + tracing::trace!("stack: {:?}", stack); renderer .render(&stack, self.strategy) .await diff --git a/crates/spfs-cli/common/Cargo.toml b/crates/spfs-cli/common/Cargo.toml index 7216df6e81..91f45cb99e 100644 --- a/crates/spfs-cli/common/Cargo.toml +++ b/crates/spfs-cli/common/Cargo.toml @@ -27,6 +27,7 @@ sentry = { workspace = true, optional = true } sentry-miette = { workspace = true, optional = true } sentry-tracing = { workspace = true, optional = true } serde_json = { version = "1.0.57", optional = true } +serde_yaml = { workspace = true } spfs = { path = "../../spfs" } strip-ansi-escapes = { workspace = true, optional = true } syslog-tracing = "0.2.0" diff --git a/crates/spfs-cli/common/src/args.rs b/crates/spfs-cli/common/src/args.rs index 3a9bed28b4..622a57a548 100644 --- a/crates/spfs-cli/common/src/args.rs +++ b/crates/spfs-cli/common/src/args.rs @@ -7,9 +7,10 @@ use std::panic::catch_unwind; #[cfg(feature = "sentry")] use std::sync::Mutex; -use miette::Error; +use miette::{Error, IntoDiagnostic, Result, WrapErr}; #[cfg(feature = "sentry")] use once_cell::sync::OnceCell; +use spfs::io::Pluralize; use spfs::storage::LocalRepository; use tracing_subscriber::prelude::*; @@ -416,6 +417,62 @@ impl Logging { } } +/// Command line flags for viewing external data in a runtime +#[derive(Debug, Clone, clap::Args)] +pub struct ExternalDataViewing { + /// Output the data value for the given external data key(s) from + /// the active runtime. Each value is printed on its own line + /// without its key. + #[clap(long, alias = "external_data")] + pub get: Option>, + + /// Output all the external data keys and values from the active + /// runtime as a yaml dictionary + #[clap(long, alias = "all_external_data")] + pub get_all: bool, +} + +impl ExternalDataViewing { + /// Display external data values based on the command line arguments + pub async fn print_data(&self, runtime: &spfs::runtime::Runtime) -> Result<()> { + if self.get_all { + let data = runtime.all_external_data().await?; + let keys = data + .keys() + .map(ToString::to_string) + .collect::>(); + let num_keys = keys.len(); + tracing::debug!( + "{num_keys} external data {}: {}", + "key".pluralize(num_keys), + keys.join(", ") + ); + println!( + "{}", + serde_yaml::to_string(&data) + .into_diagnostic() + .wrap_err("Failed to generate yaml output")? + ); + } else if let Some(keys) = &self.get { + tracing::debug!("--get these keys: {}", keys.join(", ")); + for key in keys.iter() { + match runtime.external_data(key).await? { + Some(value) => { + tracing::debug!("{key} = {value}"); + println!("{value}"); + } + None => { + tracing::warn!("No external data stored under: {key}"); + println!(); + } + } + } + } + + Ok(()) + } +} + /// Trait all spfs cli command parsers must implement to provide the /// name of the spfs command that has been parsed. This method will be /// called when configuring sentry. diff --git a/crates/spfs-cli/common/src/lib.rs b/crates/spfs-cli/common/src/lib.rs index 3027811d04..768e07cac3 100644 --- a/crates/spfs-cli/common/src/lib.rs +++ b/crates/spfs-cli/common/src/lib.rs @@ -11,6 +11,6 @@ pub mod __private { pub use {libc, spfs}; } -pub use args::{capture_if_relevant, CommandName, Logging, Render, Sync}; +pub use args::{capture_if_relevant, CommandName, ExternalDataViewing, Logging, Render, Sync}; #[cfg(feature = "sentry")] pub use args::{configure_sentry, shutdown_sentry}; diff --git a/crates/spfs-cli/main/Cargo.toml b/crates/spfs-cli/main/Cargo.toml index 639efc9ac3..cb153a9fe6 100644 --- a/crates/spfs-cli/main/Cargo.toml +++ b/crates/spfs-cli/main/Cargo.toml @@ -32,6 +32,7 @@ nix = { workspace = true } number_prefix = "*" # we hope to match versions with indicatif relative-path = "1.3" serde_json = { workspace = true } +serde_yaml = { workspace = true } spfs = { path = "../../spfs" } spfs-cli-common = { path = "../common" } strum = { workspace = true, features = ["derive"] } @@ -52,3 +53,7 @@ features = [ "Win32_System_SystemInformation", "Win32_System_Threading", ] + +[dev-dependencies] +rstest = { workspace = true } +tempfile = { workspace = true } diff --git a/crates/spfs-cli/main/src/cmd_info.rs b/crates/spfs-cli/main/src/cmd_info.rs index 2645f0d801..87ca0395a3 100644 --- a/crates/spfs-cli/main/src/cmd_info.rs +++ b/crates/spfs-cli/main/src/cmd_info.rs @@ -7,6 +7,7 @@ use colored::*; use miette::Result; use spfs::env::SPFS_DIR; use spfs::find_path::ObjectPathEntry; +use spfs::graph::ExternalData; use spfs::io::{self, DigestFormat, Pluralize}; use spfs::prelude::*; use spfs::{self}; @@ -18,6 +19,9 @@ pub struct CmdInfo { #[clap(flatten)] logging: cli::Logging, + #[clap(flatten)] + external_data: cli::ExternalDataViewing, + /// Lists file sizes in human readable format #[clap(long, short = 'H')] human_readable: bool, @@ -129,8 +133,20 @@ impl CmdInfo { println!( " {} {}", "manifest:".bright_blue(), - self.format_digest(*obj.manifest(), repo).await? + match obj.manifest() { + None => "None".to_string(), + Some(manifest_digest) => self.format_digest(*manifest_digest, repo).await?, + } ); + + if let Some(data) = obj.external_data() { + let external_data: ExternalData = data.into(); + println!(" {}", "external data:".bright_blue()); + println!(" {} {}", "key:".bright_blue(), external_data.key()); + println!(" {} {:?}", "value:".bright_blue(), external_data.value()); + } else { + println!(" {} none", "external data:".bright_blue()); + } } Enum::Manifest(obj) => { @@ -186,6 +202,10 @@ impl CmdInfo { async fn print_global_info(&self, repo: &spfs::storage::RepositoryHandle) -> Result<()> { let runtime = spfs::active_runtime().await?; + if self.external_data.get_all || self.external_data.get.is_some() { + return self.external_data.print_data(&runtime).await; + } + println!("{}:", "Active Runtime".green()); println!(" {}: {}", "id".bright_blue(), runtime.name()); println!(" {}: {}", "editable".bright_blue(), runtime.status.editable); diff --git a/crates/spfs-cli/main/src/cmd_run.rs b/crates/spfs-cli/main/src/cmd_run.rs index b33e4adf6c..fa84280939 100644 --- a/crates/spfs-cli/main/src/cmd_run.rs +++ b/crates/spfs-cli/main/src/cmd_run.rs @@ -2,15 +2,112 @@ // SPDX-License-Identifier: Apache-2.0 // https://github.com/imageworks/spk +use std::collections::BTreeMap; use std::ffi::OsString; +use std::io; use std::time::Instant; use clap::{ArgGroup, Args}; -use miette::{Context, Result}; +use miette::{miette, Context, IntoDiagnostic, Result}; +use spfs::graph::object::EncodingFormat; use spfs::prelude::*; +use spfs::runtime::KeyValuePair; +use spfs::storage::FromConfig; use spfs::tracking::EnvSpec; use spfs_cli_common as cli; +#[cfg(test)] +#[path = "./cmd_run_test.rs"] +mod cmd_run_test; +#[cfg(test)] +#[path = "./fixtures.rs"] +mod fixtures; + +#[derive(Args, Clone, Debug)] +pub struct ExternalData { + /// Adds extra external key-value string data to the new runtime. + /// + /// This allows external processes to store arbitrary data in the + /// runtimes they create. This is most useful with durable runtimes. + /// The data can be retrieved by running `spfs runtime info` or + /// `spfs info` and using the `--get ` or `--get-all` options + /// + /// External data is specified as key-value string pairs separated + /// by either an equals sign or colon (--external-data name=value + /// --external-data other:value). Multiple pairs of external data + /// can also be specified at once in yaml or json format + /// (--external-data '{name: value, other: value}'). + /// + /// External data can also be given in a json or yaml file, by + /// using the `--external-data-file ` argument. If given, + /// `--external-data` arguments will supersede anything given in + /// external data files. + /// + /// If the same key is used more than once, the last key-value pair + /// will override the earlier values for the same key. + #[clap(long, value_name = "KEY:VALUE")] + pub external_data: Vec, + + /// Specify external extra key-value data from a json or yaml file + /// (see --external-data) + #[clap(long)] + pub external_data_file: Vec, +} + +impl ExternalData { + /// Returns a list of external data key-value pairs gathered from + /// all the external data related command line arguments. The same + /// keys, and values, can appear multiple times in the list if + /// specified multiple times in various command line arguments. + pub fn get_data(&self) -> Result> { + let mut data: Vec = Vec::new(); + + for filename in self.external_data_file.iter() { + let reader: Box = + if Ok("-".to_string()) == filename.clone().into_os_string().into_string() { + // Treat '-' as "read from stdin" + Box::new(io::stdin()) + } else { + Box::new( + std::fs::File::open(filename) + .into_diagnostic() + .wrap_err(format!("Failed to open external data file: {filename:?}"))?, + ) + }; + let external_data: BTreeMap = serde_yaml::from_reader(reader) + .into_diagnostic() + .wrap_err(format!( + "Failed to parse as external data key-value pairs: {filename:?}" + ))?; + data.extend(external_data); + } + + for pair in self.external_data.iter() { + let pair = pair.trim(); + if pair.starts_with('{') { + let given: BTreeMap = serde_yaml::from_str(pair) + .into_diagnostic() + .wrap_err("--external-data value looked like yaml, but could not be parsed")?; + data.extend(given); + continue; + } + + let (name, value) = pair + .split_once('=') + .or_else(|| pair.split_once(':')) + .ok_or_else(|| { + miette!( + "Invalid option: -external-data {pair} (should be in the form name=value)" + ) + })?; + + data.push((name.to_string(), value.to_string())); + } + + Ok(data) + } +} + /// Run a program in a configured spfs environment #[derive(Debug, Args)] #[clap(group( @@ -51,6 +148,9 @@ pub struct CmdRun { #[clap(long, value_name = "RUNTIME_NAME")] pub rerun: Option, + #[clap(flatten)] + pub external_data: ExternalData, + /// The tag or id of the desired runtime /// /// Use '-' to request an empty environment @@ -146,6 +246,26 @@ impl CmdRun { ); } + let data = self.external_data.get_data()?; + if !data.is_empty() { + if config.storage.encoding_format == EncodingFormat::Legacy { + return Err(spfs::Error::String( + "Cannot use '--external-data' when spfs is configured to use the 'Legacy' encoding format".to_string(), + ) + .into()); + } + + // These are added in reverse order so that the ones + // specified later on the command line will take precedence. + for (key, value) in data.into_iter().rev() { + tracing::trace!("ex data being added: {key}: {value}"); + runtime + .add_external_data(key, value, config.filesystem.external_data_size_limit) + .await?; + } + tracing::trace!(" with external data: {:?}", runtime); + } + let start_time = Instant::now(); runtime.config.mount_backend = config.filesystem.backend; runtime.config.secondary_repositories = config.get_secondary_runtime_repositories(); diff --git a/crates/spfs-cli/main/src/cmd_run_test.rs b/crates/spfs-cli/main/src/cmd_run_test.rs new file mode 100644 index 0000000000..f9dd3f671f --- /dev/null +++ b/crates/spfs-cli/main/src/cmd_run_test.rs @@ -0,0 +1,198 @@ +// Copyright (c) Sony Pictures Imageworks, et al. +// SPDX-License-Identifier: Apache-2.0 +// https://github.com/imageworks/spk + +use std::fs; + +use rstest::rstest; + +use super::fixtures::*; +use super::ExternalData; + +// Test - --extra-data field1:value1 --extra-data field2:value2 +// Test - --extra-data field1=value1 --extra-data field2=value2 +// Test - --extra-data field1:value1 --extra-data field2=value2 +// Test - --extra-data field1=value1 --extra-data field2:value2 +// Test - --extra-data '{ field1: value1, field2: value2 }' +#[rstest] +#[case(vec!["field1:value1".to_string(), "field2:value2".to_string()])] +#[case(vec!["field1=value1".to_string(), "field2=value2".to_string()])] +#[case(vec!["field1:value1".to_string(), "field2=value2".to_string()])] +#[case(vec!["field1=value1".to_string(), "field2:value2".to_string()])] +#[case(vec!["{field1: value1, field2: value2}".to_string()])] +fn test_cmd_run_create_external_data(#[case] values: Vec) { + // Setup some data for the key value pairs + let field1 = "field1".to_string(); + let field2 = "field2".to_string(); + let value1 = "value1".to_string(); + let value2 = "value2".to_string(); + + let filenames = Vec::new(); + + // Test - --external-data field1:value1 --external-data field2:value2 + let data = ExternalData { + external_data: values, + external_data_file: filenames, + }; + + let result = data + .get_data() + .expect("unable to parse cmd_run --external-data values"); + assert!(!result.is_empty()); + assert!(result.len() == 2); + println!("r0: {:?}", result[0]); + assert!(result[0] == (field1, value1)); + println!("r1: {:?}", result[1]); + assert!(result[1] == (field2, value2)); +} + +#[rstest] +#[should_panic] +#[case(vec!["field1,value1".to_string(), "field2/value2".to_string()])] +#[should_panic] +#[case(vec!["field1 value1".to_string(), "field2 value2".to_string()])] +#[should_panic] +#[case(vec!["{field1: value1, field2: - value2 - value1}".to_string()])] +fn test_cmd_run_create_external_data_invalid(#[case] invalid_values: Vec) { + let filenames = Vec::new(); + + // Test - --external-data with_an_invalid_argument_value + let data = ExternalData { + external_data: invalid_values, + external_data_file: filenames, + }; + + let _result = data + .get_data() + .expect("unable to parse cmd_run --exteranl-data values"); +} + +#[rstest] +fn test_cmd_run_create_external_data_from_file(tmpdir: tempfile::TempDir) { + // Setup some data for the key value pairs + let field1 = "field1".to_string(); + let field2 = "field2".to_string(); + let value1 = "value1".to_string(); + let value2 = "value2".to_string(); + let external_data = format!("{field1}: {value1}\n{field2}: {value2}\n"); + + let filename = tmpdir.path().join("filename.yaml"); + fs::write(filename.clone(), external_data) + .expect("Unable to write external data to file during setup"); + let filenames = vec![filename]; + + let values = Vec::new(); + + // Test - --external-data-file filename.yaml + let data = ExternalData { + external_data: values, + external_data_file: filenames, + }; + + let result = data + .get_data() + .expect("unable to parse cmd_run --external-data values"); + assert!(!result.is_empty()); + assert!(result.len() == 2); + assert!(result[0] == (field1, value1)); + assert!(result[1] == (field2, value2)); +} + +#[rstest] +#[should_panic] +fn test_cmd_run_create_external_data_from_file_not_exist(tmpdir: tempfile::TempDir) { + // Setup a file name that does not exist + let filename = tmpdir.path().join("nosuchfile.yaml"); + let filenames = vec![filename]; + + let values = Vec::new(); + + // Test - --external-data-file nosuchfile.yaml + let data = ExternalData { + external_data: values, + external_data_file: filenames, + }; + + let _result = data + .get_data() + .expect("unable to parse cmd_run --external-data values"); +} + +#[rstest] +#[should_panic] +fn test_cmd_run_create_external_data_from_file_invalid_keyvalues(tmpdir: tempfile::TempDir) { + // Setup some data for the key value pairs + let field1 = "field1".to_string(); + let field2 = "field2".to_string(); + let value1 = "value1".to_string(); + let value2 = "value2".to_string(); + let external_data = format!("{field1}: {value1}\n{field2}:\n - {value2}\n - {value1}\n"); + + let filename = tmpdir.path().join("filename.yaml"); + fs::write(filename.clone(), external_data) + .expect("Unable to write external data to file during setup"); + let filenames = vec![filename]; + + let values = Vec::new(); + + // Test - --external-data-file filename.yaml that contains more than key-value string pairs + let data = ExternalData { + external_data: values, + external_data_file: filenames, + }; + + let _result = data + .get_data() + .expect("unable to parse cmd_run --external-data values"); +} + +#[rstest] +fn test_cmd_run_create_external_data_all(tmpdir: tempfile::TempDir) { + // Setup some data for the key value pairs + let field1 = "field1".to_string(); + let field2 = "field2".to_string(); + let field3 = "field3".to_string(); + let field4 = "field4".to_string(); + + let value1 = "value1".to_string(); + let value2 = "value2".to_string(); + let value3 = "value3".to_string(); + let value4 = "value4".to_string(); + + let values: Vec = vec![ + format!("{field1}:{value1}"), + format!("{field2}={value2}"), + "{field3: value3, field4: value4}".to_string(), + ]; + + let external_data = format!("{field4}: {value4}\n{field3}: {value3}\n"); + + let filename = tmpdir.path().join("filename.yaml"); + fs::write(filename.clone(), external_data) + .expect("Unable to write external data to file during setup"); + let filenames = vec![filename]; + + // Test - --external-data field1:value1 --external-data field2=value2 + // --external-data '{ field1: value1, field2: value2 }' + // --external-data-file filename.yaml + let data = ExternalData { + external_data: values, + external_data_file: filenames, + }; + + let result = data + .get_data() + .expect("unable to parse cmd_run --external-data values"); + + assert!(!result.is_empty()); + assert!(result.len() == 6); + // from --external-data-file filename.taml + assert!(result[0] == (field3.clone(), value3.clone())); + assert!(result[1] == (field4.clone(), value4.clone())); + // from --external-data field1:value1 external-data field2:value2 + assert!(result[2] == (field1, value1)); + assert!(result[3] == (field2, value2)); + // from --external-data '{field3: value3, field4: value4}' + assert!(result[4] == (field3, value3)); + assert!(result[5] == (field4, value4)); +} diff --git a/crates/spfs-cli/main/src/cmd_runtime_info.rs b/crates/spfs-cli/main/src/cmd_runtime_info.rs index cfb49ace92..816543c8d1 100644 --- a/crates/spfs-cli/main/src/cmd_runtime_info.rs +++ b/crates/spfs-cli/main/src/cmd_runtime_info.rs @@ -4,6 +4,7 @@ use clap::Args; use miette::{Context, IntoDiagnostic, Result}; +use spfs_cli_common as cli; /// Show the complete state of a runtime #[derive(Debug, Args)] @@ -12,6 +13,9 @@ pub struct CmdRuntimeInfo { #[clap(short, long)] remote: Option, + #[clap(flatten)] + external_data: cli::ExternalDataViewing, + /// The name/id of the runtime to remove #[clap(env = "SPFS_RUNTIME")] name: String, @@ -28,6 +32,12 @@ impl CmdRuntimeInfo { }; let runtime = runtime_storage.read_runtime(&self.name).await?; + + if self.external_data.get_all || self.external_data.get.is_some() { + self.external_data.print_data(&runtime).await?; + return Ok(0); + } + serde_json::to_writer_pretty(std::io::stdout(), runtime.data()) .into_diagnostic() .wrap_err("Failed to generate json output")?; diff --git a/crates/spfs-cli/main/src/cmd_shell.rs b/crates/spfs-cli/main/src/cmd_shell.rs index fdf915f825..578a7688e9 100644 --- a/crates/spfs-cli/main/src/cmd_shell.rs +++ b/crates/spfs-cli/main/src/cmd_shell.rs @@ -7,6 +7,7 @@ use miette::Result; use spfs_cli_common as cli; use super::cmd_run; +use super::cmd_run::ExternalData; /// Enter a subshell in a configured spfs environment #[derive(Debug, Args)] @@ -48,6 +49,9 @@ pub struct CmdShell { #[clap(short, long, env = "SPFS_KEEP_RUNTIME")] pub keep_runtime: bool, + #[clap(flatten)] + pub external_data: ExternalData, + /// The tag or id of the desired runtime /// /// Use '-' or nothing to request an empty environment @@ -67,6 +71,7 @@ impl CmdShell { runtime_name: self.runtime_name.clone(), reference: self.reference.clone(), keep_runtime: self.keep_runtime, + external_data: self.external_data.clone(), command: Default::default(), }; run_cmd.run(config).await diff --git a/crates/spfs-cli/main/src/fixtures.rs b/crates/spfs-cli/main/src/fixtures.rs new file mode 100644 index 0000000000..96eee98c40 --- /dev/null +++ b/crates/spfs-cli/main/src/fixtures.rs @@ -0,0 +1,10 @@ +use rstest::fixture; +use tempfile::TempDir; + +#[fixture] +pub fn tmpdir() -> TempDir { + tempfile::Builder::new() + .prefix("spfs-test-") + .tempdir() + .expect("failed to create dir for test") +} diff --git a/crates/spfs-proto/schema/spfs.fbs b/crates/spfs-proto/schema/spfs.fbs index 071aa1de1c..25e42a442a 100644 --- a/crates/spfs-proto/schema/spfs.fbs +++ b/crates/spfs-proto/schema/spfs.fbs @@ -20,7 +20,8 @@ table Platform { } table Layer { - manifest:Digest (required); + manifest:Digest; + external_data:ExternalData; } table Manifest { @@ -34,6 +35,26 @@ table Blob { payload:Digest (required); } +/// External data that is small enough is stored as a string in the +/// layer, large data is stored outside the layer in a blob object +/// pointed at by a digest +union ExternalDataValue { ExternalDataString, ExternalDataDigest } + +/// Needed because unions have to contain tables +table ExternalDataString { + data:string; +} +/// Needed because unions have to contain tables +table ExternalDataDigest { + digest:Digest; +} + +/// External data held in a layer for use by external tools run inside a runtime +table ExternalData { + key:string (required); + data:ExternalDataValue (required); +} + /// Digest is the result of a hashing operation over binary data. struct Digest { bytes:[uint8:32]; // SHA-256 output len (256 / 8) diff --git a/crates/spfs/src/check.rs b/crates/spfs/src/check.rs index fc380bfdc0..84d20cbb36 100644 --- a/crates/spfs/src/check.rs +++ b/crates/spfs/src/check.rs @@ -12,6 +12,7 @@ use once_cell::sync::OnceCell; use progress_bar_derive_macro::ProgressBar; use tokio::sync::Semaphore; +use crate::graph::ExternalDataValue; use crate::prelude::*; use crate::sync::{SyncObjectResult, SyncPayloadResult, SyncPolicy}; use crate::{encoding, graph, storage, tracking, Error, Result}; @@ -333,10 +334,23 @@ where /// /// To also check if the layer object exists, use [`Self::check_digest`] pub async fn check_layer(&self, layer: graph::Layer) -> Result { - let result = self.check_digest(*layer.manifest()).await?; + let manifest_result = if let Some(manifest_digest) = layer.manifest() { + self.check_digest(*manifest_digest).await? + } else { + // This layer has no manifest, don't worry about it + CheckObjectResult::Ignorable + }; + let external_data_result = match layer.external_data() { + None => CheckExternalDataResult::Skipped, + Some(external_data) => self.check_external_data(external_data.into()).await?, + }; + let res = CheckLayerResult { layer, - result, + results: vec![ + manifest_result, + CheckObjectResult::ExternalData(external_data_result), + ], repaired: false, }; Ok(res) @@ -362,6 +376,26 @@ where Ok(res) } + /// Validate that the identified external data layer's value exists. + pub async fn check_external_data( + &self, + external_data: graph::ExternalData<'_>, + ) -> Result { + let res = match external_data.value() { + ExternalDataValue::String(_) => CheckExternalDataResult::Skipped, + ExternalDataValue::Blob(d) => { + let blob = self.repo.read_blob(d).await?; + let result = unsafe { self.check_blob(&blob).await? }; + CheckExternalDataResult::Checked { + digest: d.clone(), + result, + repaired: false, + } + } + }; + Ok(res) + } + /// Validate that the identified blob has its payload. /// /// To also check if the blob object exists, use [`Self::check_digest`] @@ -801,6 +835,7 @@ pub enum CheckObjectResult { Layer(Box), Blob(CheckBlobResult), Manifest(CheckManifestResult), + ExternalData(CheckExternalDataResult), } impl CheckObjectResult { @@ -815,6 +850,7 @@ impl CheckObjectResult { CheckObjectResult::Layer(r) => r.set_repaired(), CheckObjectResult::Blob(r) => r.set_repaired(), CheckObjectResult::Manifest(r) => r.set_repaired(), + CheckObjectResult::ExternalData(r) => r.set_repaired(), } } @@ -831,6 +867,7 @@ impl CheckObjectResult { Layer(res) => res.summary(), Blob(res) => res.summary(), Manifest(res) => res.summary(), + ExternalData(res) => res.summary(), } } } @@ -862,7 +899,7 @@ impl CheckPlatformResult { pub struct CheckLayerResult { pub repaired: bool, pub layer: graph::Layer, - pub result: CheckObjectResult, + pub results: Vec, } impl CheckLayerResult { @@ -872,7 +909,7 @@ impl CheckLayerResult { } pub fn summary(&self) -> CheckSummary { - let mut summary = self.result.summary(); + let mut summary: CheckSummary = self.results.iter().map(|r| r.summary()).sum(); summary += CheckSummary::checked_one_object(); if self.repaired { summary.repaired_objects += 1; @@ -904,6 +941,34 @@ impl CheckManifestResult { } } +#[derive(Debug)] +pub enum CheckExternalDataResult { + /// The external data was stored directly in the layer and did not + /// need checking + Skipped, + /// The external data was stored in a blob and was checked + Checked { + digest: encoding::Digest, + result: CheckBlobResult, + repaired: bool, + }, +} + +impl CheckExternalDataResult { + fn set_repaired(&mut self) { + if let Self::Checked { repaired, .. } = self { + *repaired = true; + } + } + + pub fn summary(&self) -> CheckSummary { + match self { + Self::Skipped => CheckSummary::default(), + Self::Checked { result, .. } => result.summary(), + } + } +} + #[derive(Debug)] pub enum CheckEntryResult { /// The entry was not one that needed checking diff --git a/crates/spfs/src/config.rs b/crates/spfs/src/config.rs index 852c3fe50c..8313786a39 100644 --- a/crates/spfs/src/config.rs +++ b/crates/spfs/src/config.rs @@ -11,6 +11,7 @@ use serde::{Deserialize, Serialize}; use storage::{FromConfig, FromUrl}; use tokio_stream::StreamExt; +use crate::graph::SPFS_EXTERNAL_DATA_LAYER_MAX_STRING_VALUE_SIZE; use crate::{graph, runtime, storage, tracking, Error, Result}; #[cfg(test)] @@ -273,6 +274,13 @@ pub struct Filesystem { /// systems that can perform read-through lookups, such as FUSE. #[serde(default = "Filesystem::default_secondary_repositories")] pub secondary_repositories: Vec, + + /// The size limit for external data before the data is stored in + /// a sepearate blob payload referenced in an external data + /// layer. Data values smaller than or equal to this are stored + /// directly in the external data layer. + #[serde(default = "Filesystem::default_external_data_size_limit")] + pub external_data_size_limit: usize, } impl Filesystem { @@ -281,6 +289,13 @@ impl Filesystem { pub fn default_secondary_repositories() -> Vec { vec![String::from("origin")] } + + /// The default size limit for a piece of external data before it + /// is stored in a separate blob payload from the external data + /// layer that contains it + pub fn default_external_data_size_limit() -> usize { + SPFS_EXTERNAL_DATA_LAYER_MAX_STRING_VALUE_SIZE + } } /// Configuration options for the fuse filesystem process diff --git a/crates/spfs/src/find_path.rs b/crates/spfs/src/find_path.rs index 00316f5b82..a013e0bdf0 100644 --- a/crates/spfs/src/find_path.rs +++ b/crates/spfs/src/find_path.rs @@ -89,13 +89,15 @@ async fn find_path_in_spfs_item( } graph::object::Enum::Layer(obj) => { - let item = repo.read_object(*obj.manifest()).await?; - let paths_to_file = find_path_in_spfs_item(filepath, &item, repo).await?; - for path in paths_to_file { - let mut new_path: ObjectPath = Vec::new(); - new_path.push(ObjectPathEntry::Parent(obj.to_object())); - new_path.extend(path); - paths.push(new_path); + if let Some(manifest_digest) = obj.manifest() { + let item = repo.read_object(*manifest_digest).await?; + let paths_to_file = find_path_in_spfs_item(filepath, &item, repo).await?; + for path in paths_to_file { + let mut new_path: ObjectPath = Vec::new(); + new_path.push(ObjectPathEntry::Parent(obj.to_object())); + new_path.extend(path); + paths.push(new_path); + } } } @@ -115,9 +117,8 @@ async fn find_path_in_spfs_item( } graph::object::Enum::Blob(_) => { - // These are not examined here when searching for the - // filepath because the filepath will be found by walking - // Manifest objects. + // Not examined here when searching for the filepath because + // filepaths are only found by walking Manifest objects. } }; diff --git a/crates/spfs/src/graph/external_data.rs b/crates/spfs/src/graph/external_data.rs new file mode 100644 index 0000000000..08fee2e9ca --- /dev/null +++ b/crates/spfs/src/graph/external_data.rs @@ -0,0 +1,182 @@ +// Copyright (c) Sony Pictures Imageworks, et al. +// SPDX-License-Identifier: Apache-2.0 +// https://github.com/imageworks/spk + +use spfs_proto::ExternalDataArgs; + +use crate::{encoding, Result}; + +#[cfg(test)] +#[path = "./external_data_test.rs"] +mod external_data_test; + +/// Default size limit for string valus stored directly in an external +/// data object. Values larger than this are stored in a blob that is +/// referenced from the external data object. +pub const SPFS_EXTERNAL_DATA_LAYER_MAX_STRING_VALUE_SIZE: usize = 16 * 1024; + +// TODO: do we need this, if have one per layer? +/// ExternalData represents a key-value pair of data from an external +/// program. +#[derive(Debug, Eq, PartialEq, Clone, Hash)] +pub struct ExternalDataInput { + pub key: String, + pub value: ExternalDataValue, +} + +/// Legacy encoding values for distinguishing the kind of +/// ExternalDataValue being encoded. +const EXTERNAL_DATA_VALUE_STRING_CODE: u8 = 1; +const EXTERNAL_DATA_VALUE_BLOB_CODE: u8 = 2; + +/// Wrapper for the ways external data values are stored +#[derive(Debug, Clone, Eq, PartialEq, Hash)] +pub enum ExternalDataValue { + /// In the ExternalData object as a string + String(String), + /// In a separate blob payload pointed at by the digest + Blob(encoding::Digest), +} + +impl ExternalDataValue { + /// The underlying spfs_proto enum entry for this kind of value. + pub fn data_type(&self) -> spfs_proto::ExternalDataValue { + match self { + ExternalDataValue::String(_) => spfs_proto::ExternalDataValue::ExternalDataString, + ExternalDataValue::Blob(_) => spfs_proto::ExternalDataValue::ExternalDataDigest, + } + } + + /// Return the data value as a string whether it is a string or a + /// digest for a blob object. + pub fn data_as_string(&self) -> String { + match self { + ExternalDataValue::String(data) => data.clone(), + ExternalDataValue::Blob(digest) => digest.to_string(), + } + } + + /// True if this value is stored directly as a string + pub fn is_string(&self) -> bool { + matches!(self, Self::String(_)) + } + + /// True if this value is stored in-directly in blob object + pub fn is_blob(&self) -> bool { + matches!(self, Self::Blob(_)) + } + + /// Note: This is used for legacy and new style digest calculations + pub fn legacy_encode(&self, mut writer: &mut impl std::io::Write) -> Result<()> { + match self { + ExternalDataValue::String(v) => { + encoding::write_uint8(&mut writer, EXTERNAL_DATA_VALUE_STRING_CODE)?; + Ok(encoding::write_string(writer, v.as_str())?) + } + ExternalDataValue::Blob(v) => { + encoding::write_uint8(&mut writer, EXTERNAL_DATA_VALUE_BLOB_CODE)?; + Ok(encoding::write_digest(writer, v)?) + } + } + } +} + +/// ExternalData represents a key-value pair of data from an external +/// program injected into a spfs runtime layer for later use by another +/// external program. +#[derive(Copy, Clone)] +pub struct ExternalData<'buf>(pub(super) spfs_proto::ExternalData<'buf>); + +impl<'buf> std::fmt::Debug for ExternalData<'buf> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ExternalData") + .field("key", &self.key()) + .field("value", &self.value()) + .finish() + } +} + +impl<'buf> From> for ExternalData<'buf> { + fn from(value: spfs_proto::ExternalData<'buf>) -> Self { + Self(value) + } +} + +impl<'buf> ExternalData<'buf> { + pub fn build<'fbb>( + &self, + builder: &mut flatbuffers::FlatBufferBuilder<'fbb>, + ) -> flatbuffers::WIPOffset> { + let fb_key = builder.create_string(self.key().as_ref()); + let fb_data = builder.create_string(self.value().data_as_string().as_ref()); + spfs_proto::ExternalData::create( + builder, + &ExternalDataArgs { + key: Some(fb_key), + data_type: self.0.data_type(), + data: Some(fb_data.as_union_value()), + }, + ) + } + + #[inline] + pub fn key(&self) -> &'buf str { + self.0.key() + } + + //#[inline] + pub fn value(&self) -> ExternalDataValue { + if let Some(data) = self.0.data_as_external_data_string() { + match data.data() { + Some(s) => ExternalDataValue::String(s.into()), + None => panic!( + "This should not happen because the data type was ExternalDataValueString" + ), + } + } else { + if let Some(data) = self.0.data_as_external_data_digest() { + match data.digest() { + Some(d) => ExternalDataValue::Blob((*d).into()), + None => panic!( + "This should not happen because the data type was ExternalDataValueDigest" + ), + } + } else { + panic!("This should not happen because the data type was ExternalDataValueDigest") + } + } + } + + /// Return the child objects of this object, if any. + pub fn child_objects(&self) -> Vec { + let mut result = Vec::new(); + if let Some(data) = self.0.data_as_external_data_digest() { + match data.digest() { + Some(d) => result.push((*d).into()), + None => {} + } + } + result + } + + /// Return the size of this external data + pub fn size(&self) -> u64 { + match &self.value() { + ExternalDataValue::String(v) => v.len() as u64, + ExternalDataValue::Blob(d) => d.len() as u64, + } + } + + /// Note: This is used for legacy and new style digest calculations + pub fn legacy_encode(&self, mut writer: &mut impl std::io::Write) -> Result<()> { + encoding::write_string(&mut writer, self.key())?; + self.value().legacy_encode(&mut writer)?; + Ok(()) + } +} + +impl<'buf> std::fmt::Display for ExternalData<'buf> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!("{}: {:?}", self.key(), self.value())) + } +} diff --git a/crates/spfs/src/graph/external_data_test.rs b/crates/spfs/src/graph/external_data_test.rs new file mode 100644 index 0000000000..c5f6257a7c --- /dev/null +++ b/crates/spfs/src/graph/external_data_test.rs @@ -0,0 +1,28 @@ +// Copyright (c) Sony Pictures Imageworks, et al. +// SPDX-License-Identifier: Apache-2.0 +// https://github.com/imageworks/spk + +use rstest::rstest; +use spfs_encoding::Digest; + +use super::ExternalDataValue; +use crate::encoding; + +#[rstest] +fn test_externaldatavalue_string() { + let value = String::from("value"); + let string_value = ExternalDataValue::String(value); + + assert!(string_value.is_string()); + assert!(!string_value.is_blob()); +} + +#[rstest] +fn test_externaldatavalue_blob() { + let digest: Digest = encoding::EMPTY_DIGEST.into(); + + let blob_value = ExternalDataValue::Blob(digest); + + assert!(blob_value.is_blob()); + assert!(!blob_value.is_string()); +} diff --git a/crates/spfs/src/graph/layer.rs b/crates/spfs/src/graph/layer.rs index 902bd5381f..7a46363e47 100644 --- a/crates/spfs/src/graph/layer.rs +++ b/crates/spfs/src/graph/layer.rs @@ -5,7 +5,7 @@ use spfs_proto::LayerArgs; use super::object::HeaderBuilder; -use super::ObjectKind; +use super::{ExternalData, ExternalDataInput, ExternalDataValue, ObjectKind}; use crate::{encoding, Error, Result}; #[cfg(test)] @@ -22,7 +22,8 @@ pub type Layer = super::object::FlatObject>; impl std::fmt::Debug for Layer { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Layer") - .field("manifest", &self.manifest().to_string()) + .field("manifest", &self.manifest()) + .field("external_data", &self.external_data()) .finish() } } @@ -36,24 +37,84 @@ impl Layer { Self::builder().with_manifest(manifest).build() } + /// Build a layer with the default header that has the provided + /// external data but does not point at any manifest, for more + /// configuration use [`Self::builder`] + #[inline] + pub fn new_with_external_data(external_data: ExternalDataInput) -> Self { + Self::builder().with_external_data(external_data).build() + } + + /// Build a layer with the default header that points at the + /// provided manifest digest and the provided external data, for + /// more configuration use [`Self::builder`] + #[inline] + pub fn new_with_manifest_and_external_data( + manifest: encoding::Digest, + external_data: ExternalDataInput, + ) -> Self { + Self::builder() + .with_manifest(manifest) + .with_external_data(external_data) + .build() + } + #[inline] pub fn builder() -> LayerBuilder { LayerBuilder::default() } #[inline] - pub fn manifest(&self) -> &encoding::Digest { + pub fn manifest(&self) -> Option<&encoding::Digest> { self.proto().manifest() } + #[inline] + pub fn external_data(&self) -> Option { + self.proto().external_data() + } + /// Return the child object of this one in the object DG. #[inline] pub fn child_objects(&self) -> Vec { - vec![*self.manifest()] + let mut children = Vec::new(); + if let Some(manifest_digest) = self.manifest() { + children.push(manifest_digest.clone()) + } + if let Some(data) = self.external_data() { + let external_data: ExternalData = data.into(); + children.extend(external_data.child_objects()); + } + children } - pub(super) fn legacy_encode(&self, writer: &mut impl std::io::Write) -> Result<()> { - encoding::write_digest(writer, self.manifest()).map_err(Error::Encoding) + pub(super) fn legacy_encode(&self, mut writer: &mut impl std::io::Write) -> Result<()> { + tracing::trace!("layer legacy_encode called..."); + let result = if let Some(manifest_digest) = self.manifest() { + let manifest_result = + encoding::write_digest(&mut writer, manifest_digest).map_err(Error::Encoding); + if let Some(data) = self.external_data() { + tracing::trace!("layer with both manifest and external data"); + let external_data: ExternalData = data.into(); + external_data.legacy_encode(&mut writer) + } else { + tracing::trace!("layer with manifest only: {manifest_result:?}"); + manifest_result + } + } else { + if let Some(data) = self.external_data() { + tracing::trace!("layer with external data only"); + let external_data: ExternalData = data.into(); + external_data.legacy_encode(&mut writer) + } else { + tracing::trace!("layer without either kind of data - an invalid layer"); + Err(Error::String( + "Invalid Layer object for legacy encoding, it has no manifest or external data" + .to_string(), + )) + } + }; + result } } @@ -73,14 +134,16 @@ impl std::cmp::Eq for Layer {} pub struct LayerBuilder { header: super::object::HeaderBuilder, - manifest: encoding::Digest, + manifest: Option, + external_data: Option, } impl Default for LayerBuilder { fn default() -> Self { Self { header: super::object::HeaderBuilder::new(ObjectKind::Layer), - manifest: encoding::NULL_DIGEST.into(), + manifest: None, + external_data: None, } } } @@ -95,16 +158,60 @@ impl LayerBuilder { } pub fn with_manifest(mut self, manifest: encoding::Digest) -> Self { - self.manifest = manifest; + self.manifest = Some(manifest); + self + } + + pub fn with_external_data(mut self, external_data: ExternalDataInput) -> Self { + self.external_data = Some(external_data); self } pub fn build(&self) -> Layer { super::BUILDER.with_borrow_mut(|builder| { + let external_data = match &self.external_data { + None => None, + Some(data) => { + let fb_key = builder.create_string(data.key.as_ref()); + let data_type = data.value.data_type(); + let fb_data = match &data.value { + ExternalDataValue::String(data_string) => { + let string_data = builder.create_string(data_string.as_ref()); + spfs_proto::ExternalDataString::create( + builder, + &spfs_proto::ExternalDataStringArgs { + data: Some(string_data), + }, + ) + .as_union_value() + } + ExternalDataValue::Blob(data_digest) => { + let blob_digest = data_digest.as_ref(); + spfs_proto::ExternalDataDigest::create( + builder, + &spfs_proto::ExternalDataDigestArgs { + digest: Some(blob_digest), + }, + ) + .as_union_value() + } + }; + Some(spfs_proto::ExternalData::create( + builder, + &spfs_proto::ExternalDataArgs { + key: Some(fb_key), + data_type, + data: Some(fb_data), + }, + )) + } + }; + let layer = spfs_proto::Layer::create( builder, &LayerArgs { - manifest: Some(&self.manifest), + manifest: self.manifest.as_ref(), + external_data, }, ); let any = spfs_proto::AnyObject::create( @@ -138,6 +245,10 @@ impl LayerBuilder { /// Read a data encoded using the legacy format, and /// use the data to fill and complete this builder pub fn legacy_decode(self, reader: &mut impl std::io::Read) -> Result { + tracing::trace!("layer legacy_decode called..."); + // Legacy layers do not have an external_data field. Trying + // to read a layer with no manifest and only external data + // here will fail. Ok(self.with_manifest(encoding::read_digest(reader)?).build()) } } diff --git a/crates/spfs/src/graph/mod.rs b/crates/spfs/src/graph/mod.rs index 87de2962b8..715a9bb826 100644 --- a/crates/spfs/src/graph/mod.rs +++ b/crates/spfs/src/graph/mod.rs @@ -8,6 +8,7 @@ mod blob; mod database; mod entry; pub mod error; +mod external_data; mod kind; mod layer; mod manifest; @@ -27,6 +28,12 @@ pub use database::{ DigestSearchCriteria, }; pub use entry::Entry; +pub use external_data::{ + ExternalData, + ExternalDataInput, + ExternalDataValue, + SPFS_EXTERNAL_DATA_LAYER_MAX_STRING_VALUE_SIZE, +}; pub use kind::{HasKind, Kind, ObjectKind}; pub use layer::Layer; pub use manifest::Manifest; diff --git a/crates/spfs/src/graph/object.rs b/crates/spfs/src/graph/object.rs index 8789135ca5..deeaafd5b8 100644 --- a/crates/spfs/src/graph/object.rs +++ b/crates/spfs/src/graph/object.rs @@ -10,7 +10,17 @@ use encoding::prelude::*; use serde::{Deserialize, Serialize}; use super::error::{ObjectError, ObjectResult}; -use super::{Blob, DatabaseView, HasKind, Kind, Layer, Manifest, ObjectKind, Platform}; +use super::{ + Blob, + DatabaseView, + ExternalData, + HasKind, + Kind, + Layer, + Manifest, + ObjectKind, + Platform, +}; use crate::encoding; use crate::storage::RepositoryHandle; @@ -157,8 +167,14 @@ impl Object { } } Enum::Layer(object) => { - let item = repo.read_object(*object.manifest()).await?; - next_iter_objects.push(item); + if let Some(manifest_digest) = object.manifest() { + let item = repo.read_object(*manifest_digest).await?; + next_iter_objects.push(item); + } + if let Some(data) = object.external_data() { + let external_data: ExternalData = data.into(); + total_size += external_data.size(); + } } Enum::Manifest(object) => { for node in object.to_tracking_manifest().walk_abs("/spfs") { diff --git a/crates/spfs/src/proto/conversions.rs b/crates/spfs/src/proto/conversions.rs index 8340e0495f..e50f266ed2 100644 --- a/crates/spfs/src/proto/conversions.rs +++ b/crates/spfs/src/proto/conversions.rs @@ -203,7 +203,17 @@ impl TryFrom for graph::Platform { impl From<&graph::Layer> for super::Layer { fn from(source: &graph::Layer) -> Self { Self { - manifest: Some(source.manifest().into()), + manifest: match source.manifest() { + None => None, + Some(m) => Some(m.into()), + }, + external_data: match source.external_data() { + None => None, + Some(data) => { + let external_data: graph::ExternalData = data.into(); + Some((&external_data).into()) + } + }, } } } @@ -211,7 +221,32 @@ impl From<&graph::Layer> for super::Layer { impl TryFrom for graph::Layer { type Error = Error; fn try_from(source: super::Layer) -> Result { - Ok(Self::new(convert_digest(source.manifest)?)) + let digest = Some(convert_digest(source.manifest)?); + + if let Some(manifest_digest) = digest { + if let Some(ffb_external_data) = source.external_data { + let external_data = ffb_external_data.try_into()?; + // A spfs filesystem layer with some external data attached + Ok(Self::new_with_manifest_and_external_data( + manifest_digest, + external_data, + )) + } else { + // A typical filesystem layer + Ok(Self::new(manifest_digest)) + } + } else { + if let Some(ffb_external_data) = source.external_data { + let external_data = ffb_external_data.try_into()?; + // An external data only layer + Ok(Self::new_with_external_data(external_data)) + } else { + Err(Error::String( + "Creating a graph::Layer requires at least one of: a manifest digest, " + .to_string(), + )) + } + } } } @@ -294,6 +329,34 @@ impl TryFrom for graph::Manifest { } } +impl From<&graph::ExternalData<'_>> for super::ExternalData { + fn from(source: &graph::ExternalData) -> Self { + use super::external_data::Value; + Self { + key: source.key().to_string(), + value: match source.value() { + graph::ExternalDataValue::String(s) => Some(Value::Data(s.clone())), + graph::ExternalDataValue::Blob(d) => Some(Value::Digest(d.into())), + }, + } + } +} + +impl TryFrom for graph::ExternalDataInput { + type Error = Error; + fn try_from(source: super::ExternalData) -> Result { + use super::external_data::Value; + Ok(Self { + key: source.key.clone(), + value: match source.value { + Some(Value::Data(s)) => graph::ExternalDataValue::String(s.clone()), + Some(Value::Digest(d)) => graph::ExternalDataValue::Blob(convert_digest(Some(d))?), + None => graph::ExternalDataValue::String(Default::default()), + }, + }) + } +} + impl<'buf> From<&graph::Tree<'buf>> for super::Tree { fn from(source: &graph::Tree) -> Self { Self { diff --git a/crates/spfs/src/proto/defs/types.proto b/crates/spfs/src/proto/defs/types.proto index c056310c67..2a67976629 100644 --- a/crates/spfs/src/proto/defs/types.proto +++ b/crates/spfs/src/proto/defs/types.proto @@ -30,6 +30,7 @@ message Platform { message Layer { Digest manifest = 1; + ExternalData external_data = 2; } message Manifest { @@ -37,6 +38,20 @@ message Manifest { repeated Tree trees = 2; } +message ExternalData { + string key = 1; + oneof value { + string data = 2; + Digest digest = 3; + } +} + +// TODO: is this needed? +//enum ExternalDataValue { +// STRING = 0; +// DIGEST = 1; +//} + message Tree { repeated Entry entries = 1; } diff --git a/crates/spfs/src/resolve.rs b/crates/spfs/src/resolve.rs index 4088bd41a7..1a578b9f3f 100644 --- a/crates/spfs/src/resolve.rs +++ b/crates/spfs/src/resolve.rs @@ -155,12 +155,14 @@ pub async fn compute_environment_manifest( let layers = resolve_stack_to_layers(&stack, Some(repo)).await?; let mut manifest = tracking::Manifest::default(); for layer in layers { - manifest.update( - &repo - .read_manifest(*layer.manifest()) - .await? - .to_tracking_manifest(), - ) + if let Some(manifest_digest) = layer.manifest() { + manifest.update( + &repo + .read_manifest(*manifest_digest) + .await? + .to_tracking_manifest(), + ) + } } Ok(manifest) } @@ -170,16 +172,24 @@ pub async fn compute_object_manifest( repo: &storage::RepositoryHandle, ) -> Result { match obj.into_enum() { - graph::object::Enum::Layer(obj) => Ok(repo - .read_manifest(*obj.manifest()) - .await? - .to_tracking_manifest()), + graph::object::Enum::Layer(obj) => { + if let Some(manifest_digest) = obj.manifest() { + Ok(repo + .read_manifest(*manifest_digest) + .await? + .to_tracking_manifest()) + } else { + Ok(tracking::Manifest::default()) + } + } graph::object::Enum::Platform(obj) => { let layers = resolve_stack_to_layers(&obj.to_stack(), Some(repo)).await?; let mut manifest = tracking::Manifest::default(); for layer in layers.iter() { - let layer_manifest = repo.read_manifest(*layer.manifest()).await?; - manifest.update(&layer_manifest.to_tracking_manifest()); + if let Some(manifest_digest) = layer.manifest() { + let layer_manifest = repo.read_manifest(*manifest_digest).await?; + manifest.update(&layer_manifest.to_tracking_manifest()); + } } Ok(manifest) } @@ -247,10 +257,12 @@ where let layers = resolve_stack_to_layers_with_repo(&runtime.status.stack, &repo).await?; let mut manifests = Vec::with_capacity(layers.len()); for (index, layer) in layers.iter().enumerate() { - manifests.push(ResolvedManifest::Existing { - order: index, - manifest: repo.read_manifest(*layer.manifest()).await?, - }); + if let Some(manifest_digest) = layer.manifest() { + manifests.push(ResolvedManifest::Existing { + order: index, + manifest: repo.read_manifest(*manifest_digest).await?, + }); + } } // Determine if layers need to be combined to stay within the length limits diff --git a/crates/spfs/src/runtime/mod.rs b/crates/spfs/src/runtime/mod.rs index 071a9e4b10..8c59f71d45 100644 --- a/crates/spfs/src/runtime/mod.rs +++ b/crates/spfs/src/runtime/mod.rs @@ -24,6 +24,7 @@ pub use storage::{ BindMount, Config, Data, + KeyValuePair, LiveLayer, LiveLayerFile, MountBackend, diff --git a/crates/spfs/src/runtime/storage.rs b/crates/spfs/src/runtime/storage.rs index 3d5c315900..cdd7e95c26 100644 --- a/crates/spfs/src/runtime/storage.rs +++ b/crates/spfs/src/runtime/storage.rs @@ -4,7 +4,7 @@ //! Definition and persistent storage of runtimes. -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::env::temp_dir; use std::fmt::Display; use std::fs::OpenOptions; @@ -27,11 +27,14 @@ use tokio::io::AsyncReadExt; use super::startup_ps; #[cfg(unix)] use super::{startup_csh, startup_sh}; +use crate::encoding::Digest; use crate::env::SPFS_DIR_PREFIX; +use crate::graph::object::Enum; +use crate::graph::{ExternalData, ExternalDataValue}; use crate::prelude::*; use crate::storage::fs::DURABLE_EDITS_DIR; use crate::storage::RepositoryHandle; -use crate::{bootstrap, encoding, graph, storage, tracking, Error, Result}; +use crate::{bootstrap, graph, storage, tracking, Error, Result}; #[cfg(test)] #[path = "./storage_test.rs"] @@ -51,6 +54,9 @@ const TRANSIENT: bool = false; const LIVE_LAYER_FILE_SUFFIX_YAML: &str = ".spfs.yaml"; const DEFAULT_LIVE_LAYER_FILENAME: &str = "layer.spfs.yaml"; +/// Data type for pairs of external data keys and values +pub type KeyValuePair = (String, String); + /// Information about the source of a runtime #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] pub struct Author { @@ -79,7 +85,7 @@ pub struct Status { /// Additional layers that were created automatically due to the stack /// being too large. #[serde(default, skip_serializing_if = "HashSet::is_empty")] - pub(crate) flattened_layers: HashSet, + pub(crate) flattened_layers: HashSet, /// Whether or not this runtime is editable /// /// An editable runtime is mounted with working directories @@ -686,6 +692,89 @@ impl Runtime { self.data.is_durable() } + /// Store an arbitrary key-value string pair in the runtime + pub async fn add_external_data( + &mut self, + key: String, + value: String, + size_limit: usize, + ) -> Result<()> { + tracing::debug!( + "about to insert external data key: {} => value: {} [len: {} > {}]", + key, + value, + value.len(), + size_limit + ); + + let external_data_value = if value.len() <= size_limit { + tracing::debug!("about to store value directly in the external data layer"); + ExternalDataValue::String(value) + } else { + tracing::debug!("about to store value in a created blob"); + let digest = self.storage.create_blob_for_string(value).await?; + tracing::debug!("created a blob under {digest}"); + ExternalDataValue::Blob(digest) + }; + + tracing::trace!("external_data_value: {external_data_value:?}"); + + let external_data = graph::ExternalDataInput { + key, + value: external_data_value, + }; + + tracing::trace!("external_data: {external_data:?}"); + + let layer = graph::Layer::new_with_external_data(external_data); + + tracing::trace!("layer make with manifest: {:?}", layer.manifest()); + + self.storage.inner.write_object(&layer).await?; + + tracing::trace!("post-write_object"); + + // The new external data is added to the bottom of the runtime's stack + let layer_digest = layer.digest()?; + tracing::trace!("layer_digest of layer with external data: {layer_digest}"); + self.push_digest(layer_digest); + + tracing::debug!("pushed layer_digest to storage, about to return: {layer_digest}."); + Ok(()) + } + + /// Return the string value stored as external data under the given key. + pub async fn external_data(&self, key: &str) -> Result> { + for digest in self.status.stack.iter_bottom_up() { + if let Some(s) = self.storage.get_external_data(&digest, key).await? { + return Ok(Some(s)); + } + } + Ok(None) + } + + /// Return all the string values that are stored as external data under any key. + pub async fn all_external_data(&self) -> Result> { + let mut data: BTreeMap = BTreeMap::new(); + tracing::trace!( + " stack: {}", + self.status + .stack + .iter_bottom_up() + .map(|d| format!("{d}")) + .collect::>() + .join(", ") + ); + for digest in self.status.stack.iter_bottom_up() { + tracing::trace!(" before call get_external_key_value_pairs: {}", digest); + let pairs = self.storage.get_external_key_value_pairs(&digest).await?; + for (key, value) in pairs { + data.insert(key, value); + } + } + Ok(data) + } + /// Reset parts of the runtime's state so it can be reused in /// another process run. pub async fn reinit_for_reuse_and_save_to_storage(&mut self) -> Result<()> { @@ -874,7 +963,7 @@ impl Runtime { /// and change the overlayfs options, but not save the runtime or /// update any currently running environment. Returns false /// if adding the digest had no change to the runtime stack. - pub fn push_digest(&mut self, digest: encoding::Digest) -> bool { + pub fn push_digest(&mut self, digest: Digest) -> bool { self.status.stack.push(digest) } @@ -1144,6 +1233,120 @@ impl Storage { OwnedRuntime::upgrade_as_owner(rt).await } + /// Create a new blob payload to hold the given string value + pub(crate) async fn create_blob_for_string(&self, payload: String) -> Result { + self.inner + .commit_blob(Box::pin(std::io::Cursor::new(payload.into_bytes()))) + .await + } + + /// Returns the value from the first external data object matching + /// the given key that can be found starting from the given + /// digest's spfs object. + pub(crate) async fn get_external_data( + &self, + digest: &Digest, + key: &str, + ) -> Result> { + let mut digests_to_process: Vec = vec![*digest]; + + while !digests_to_process.is_empty() { + let mut next_iter_digests: Vec = Vec::new(); + for digest in digests_to_process.iter() { + match self + .inner + .read_ref(digest.to_string().as_str()) + .await? + .into_enum() + { + Enum::Platform(platform) => { + for reference in platform.iter_bottom_up() { + next_iter_digests.push(*reference); + } + } + Enum::Layer(layer) => { + if let Some(data) = layer.external_data() { + let external_data: ExternalData = data.into(); + if external_data.key() == key { + let value = self.get_external_data_value(&external_data).await?; + return Ok(Some(value)); + } + } + } + _ => { + // None of the other objects contain ExternalData + } + } + } + digests_to_process = std::mem::take(&mut next_iter_digests); + } + Ok(None) + } + + /// Returns all key-value pairs from the external data object that + /// can be found starting from the given digest's spfs object. + pub(crate) async fn get_external_key_value_pairs( + &self, + digest: &Digest, + ) -> Result> { + let mut key_value_pairs: Vec = Vec::new(); + + let mut digests_to_process: Vec = vec![*digest]; + while !digests_to_process.is_empty() { + let mut next_iter_digests: Vec = Vec::new(); + for digest in digests_to_process.iter() { + match self + .inner + .read_ref(digest.to_string().as_str()) + .await? + .into_enum() + { + Enum::Platform(platform) => { + for reference in platform.iter_bottom_up() { + next_iter_digests.push(*reference); + } + } + Enum::Layer(layer) => { + if let Some(data) = layer.external_data() { + let external_data: ExternalData = data.into(); + let key = external_data.key(); + let value = self.get_external_data_value(&external_data).await?; + key_value_pairs.push((key.to_string(), value)); + } + } + _ => { + // None of the other objects contain ExternalDataLayers + } + } + } + digests_to_process = std::mem::take(&mut next_iter_digests); + } + Ok(key_value_pairs) + } + + /// Get the value as a string from the given external data + async fn get_external_data_value(&self, external_data: &ExternalData<'_>) -> Result { + let data = match external_data.value() { + ExternalDataValue::String(s) => s.clone(), + ExternalDataValue::Blob(digest) => { + let blob = self.inner.read_blob(digest).await?; + let (mut payload, filename) = self.inner.open_payload(*blob.digest()).await?; + let mut writer: Vec = vec![]; + tokio::io::copy(&mut payload, &mut writer) + .await + .map_err(|err| { + Error::StorageReadError( + "copy of external data payload to string buffer", + filename, + err, + ) + })?; + String::from_utf8(writer).map_err(|err| Error::String(err.to_string()))? + } + }; + Ok(data) + } + pub async fn durable_path(&self, name: String) -> Result { match &*self.inner { RepositoryHandle::FS(repo) => { diff --git a/crates/spfs/src/runtime/storage_test.rs b/crates/spfs/src/runtime/storage_test.rs index dc7206b006..dc0a5393b3 100644 --- a/crates/spfs/src/runtime/storage_test.rs +++ b/crates/spfs/src/runtime/storage_test.rs @@ -10,12 +10,16 @@ use std::str::FromStr; use futures::TryStreamExt; use rstest::rstest; +use spfs_encoding::Digestible; use super::{makedirs_with_perms, Data, Storage}; use crate::encoding; use crate::fixtures::*; +use crate::graph::object::EncodingFormat; +use crate::graph::{ExternalDataInput, ExternalDataValue, Layer, Platform}; use crate::runtime::storage::{LiveLayerApiVersion, LiveLayerContents}; use crate::runtime::{BindMount, LiveLayer, LiveLayerFile}; +use crate::storage::prelude::Database; #[rstest] fn test_bindmount_creation() { @@ -172,6 +176,248 @@ async fn test_storage_create_runtime(tmpdir: tempfile::TempDir) { .is_err()); } +#[rstest] +#[tokio::test] +async fn test_storage_runtime_with_external_data(tmpdir: tempfile::TempDir) { + let root = tmpdir.path().to_string_lossy().to_string(); + let repo = crate::storage::RepositoryHandle::from( + crate::storage::fs::FsRepository::create(root) + .await + .unwrap(), + ); + let storage = Storage::new(repo); + let limit: usize = 16 * 1024; + + let keep_runtime = false; + let live_layers = Vec::new(); + let mut runtime = storage + .create_named_runtime("test-with-external-data", keep_runtime, live_layers) + .await + .expect("failed to create runtime in storage"); + + // Test - insert data + let key = "somefield".to_string(); + let value = "some value".to_string(); + assert!(runtime + .add_external_data(key.clone(), value.clone(), limit) + .await + .is_ok()); + + // Test - insert some more data + let value2 = "someothervalue".to_string(); + assert!(runtime + .add_external_data(key.clone(), value2, limit) + .await + .is_ok()); + + // Test - retrieve data - the first inserted data should be the + // what is retrieved because of how adding to the runtime stack + // works. + if EncodingFormat::default() == EncodingFormat::Legacy { + if let Ok(_) = runtime.external_data(&key).await { + panic!("This should fail when EncodingFormat::Legacy is the default") + } + // Don't run the rest of the test when EncodingFormat::Legacy is used + return; + }; + + let result = runtime.external_data(&key).await.unwrap(); + assert!(result.is_some()); + + assert!(value == *result.unwrap()); +} + +#[rstest] +#[tokio::test] +async fn test_storage_runtime_with_nested_external_data(tmpdir: tempfile::TempDir) { + // Setup the objects needed for the runtime used in the test + let root = tmpdir.path().to_string_lossy().to_string(); + let repo = crate::storage::RepositoryHandle::from( + crate::storage::fs::FsRepository::create(root) + .await + .unwrap(), + ); + + // make an external data layer + let key = "somefield".to_string(); + let value = "somevalue".to_string(); + let external_data = ExternalDataInput { + key: key.clone(), + value: ExternalDataValue::String(value.clone()), + }; + let layer = Layer::new_with_external_data(external_data); + repo.write_object(&layer).await.unwrap(); + + // make a platform that contains the external data layer + let layers: Vec = vec![layer.digest().unwrap()]; + let platform = Platform::from_iter(layers); + repo.write_object(&platform.clone()).await.unwrap(); + + // put the platform into a runtime + let storage = Storage::new(repo); + let keep_runtime = false; + let live_layers = Vec::new(); + let mut runtime = storage + .create_named_runtime("test-with-external-data-nested", keep_runtime, live_layers) + .await + .expect("failed to create runtime in storage"); + runtime.push_digest(platform.digest().unwrap()); + + if EncodingFormat::default() == EncodingFormat::Legacy { + if let Ok(_) = runtime.external_data(&key).await { + panic!("This should fail when EncodingFormat::Legacy is the default") + } + // Don't run the rest of the test when EncodingFormat::Legacy is used + return; + }; + + // Test - retrieve the data even though it is nested inside a + // platform object in the runtime. + let result = runtime.external_data(&key).await.unwrap(); + assert!(result.is_some()); + + assert!(value == *result.unwrap()); +} + +#[rstest] +#[tokio::test] +async fn test_storage_runtime_with_external_data_all(tmpdir: tempfile::TempDir) { + let root = tmpdir.path().to_string_lossy().to_string(); + let repo = crate::storage::RepositoryHandle::from( + crate::storage::fs::FsRepository::create(root) + .await + .unwrap(), + ); + let storage = Storage::new(repo); + let limit: usize = 16 * 1024; + + let keep_runtime = false; + let live_layers = Vec::new(); + let mut runtime = storage + .create_named_runtime("test-with-external-data-all", keep_runtime, live_layers) + .await + .expect("failed to create runtime in storage"); + + // Test - insert two distinct data values + let key = "somefield".to_string(); + let value = "somevalue".to_string(); + + assert!(runtime + .add_external_data(key.clone(), value.clone(), limit) + .await + .is_ok()); + + let key2 = "somefield2".to_string(); + let value2 = "somevalue2".to_string(); + assert!(runtime + .add_external_data(key2.clone(), value2.clone(), limit) + .await + .is_ok()); + + // Test - get all the data back out + if EncodingFormat::default() == EncodingFormat::Legacy { + if let Ok(_) = runtime.all_external_data().await { + panic!("This should fail when EncodingFormat::Legacy is the default") + } + // Don't run the rest of the test when EncodingFormat::Legacy is used + return; + }; + + let result = runtime.all_external_data().await.unwrap(); + + assert!(result.len() == 2); + for (expected_key, expected_value) in [(key, value), (key2, value2)].iter() { + assert!(result.get(expected_key).is_some()); + match result.get(expected_key) { + Some(v) => { + assert!(v == expected_value); + } + None => panic!("Value missing for {expected_key} when getting all external data"), + } + } +} + +#[rstest] +#[tokio::test] +async fn test_storage_runtime_with_nested_external_data_all(tmpdir: tempfile::TempDir) { + // setup the objects needed for the runtime used in the test + let root = tmpdir.path().to_string_lossy().to_string(); + let repo = crate::storage::RepositoryHandle::from( + crate::storage::fs::FsRepository::create(root) + .await + .unwrap(), + ); + + // make two distinct data values + let key = "somefield".to_string(); + let value = "somevalue".to_string(); + let external_data = ExternalDataInput { + key: key.clone(), + value: ExternalDataValue::String(value.clone()), + }; + + let layer = Layer::new_with_external_data(external_data); + repo.write_object(&layer.clone()).await.unwrap(); + + let key2 = "somefield2".to_string(); + let value2 = "somevalue2".to_string(); + let external_data2 = ExternalDataInput { + key: key2.clone(), + value: ExternalDataValue::String(value2.clone()), + }; + + let layer2 = Layer::new_with_external_data(external_data2.clone()); + repo.write_object(&layer2.clone()).await.unwrap(); + + // make a platform with one external data layer + let layers: Vec = vec![layer.digest().unwrap()]; + let platform = Platform::from_iter(layers); + repo.write_object(&platform.clone()).await.unwrap(); + + // make another platform with the first platform and the other + // external data layer. this second platform is in the runtime + let layers2: Vec = vec![platform.digest().unwrap(), layer2.digest().unwrap()]; + let platform2 = Platform::from_iter(layers2); + repo.write_object(&platform2.clone()).await.unwrap(); + + // finally set up the runtime + let storage = Storage::new(repo); + + let keep_runtime = false; + let live_layers = Vec::new(); + let mut runtime = storage + .create_named_runtime( + "test-with-external-data-all-nested", + keep_runtime, + live_layers, + ) + .await + .expect("failed to create runtime in storage"); + runtime.push_digest(platform2.digest().unwrap()); + + // Test - get all the data back out even thought it is nested at + // different levels in different platform objects in the runtime + if EncodingFormat::default() == EncodingFormat::Legacy { + if let Ok(_) = runtime.all_external_data().await { + panic!("This should fail when EncodingFormat::Legacy is the default") + } + // Don't run the rest of the test when EncodingFormat::Legacy is used + return; + }; + + let result = runtime.all_external_data().await.unwrap(); + assert!(result.len() == 2); + for (expected_key, expected_value) in [(key, value), (key2, value2)].iter() { + assert!(result.get(expected_key).is_some()); + match result.get(expected_key) { + Some(v) => { + assert!(v == expected_value); + } + None => panic!("Value missing for {expected_key} when getting all external data"), + } + } +} + #[rstest] #[tokio::test] async fn test_storage_remove_runtime(tmpdir: tempfile::TempDir) { diff --git a/crates/spfs/src/storage/fs/renderer_unix.rs b/crates/spfs/src/storage/fs/renderer_unix.rs index 1e648a8b0e..96c358a5b6 100644 --- a/crates/spfs/src/storage/fs/renderer_unix.rs +++ b/crates/spfs/src/storage/fs/renderer_unix.rs @@ -261,17 +261,19 @@ where .map_err(|err| err.wrap("resolve stack to layers"))?; let mut futures = futures::stream::FuturesOrdered::new(); for layer in layers { - let digest = *layer.manifest(); - let fut = self - .repo - .read_manifest(digest) - .map_err(move |err| err.wrap(format!("read manifest {digest}"))) - .and_then(move |manifest| async move { - self.render_manifest(&manifest, render_type) - .await - .map_err(move |err| err.wrap(format!("render manifest {digest}"))) - }); - futures.push_back(fut); + if let Some(manifest_digest) = layer.manifest() { + let digest = manifest_digest.clone(); + let fut = self + .repo + .read_manifest(digest) + .map_err(move |err| err.wrap(format!("read manifest {digest}"))) + .and_then(move |manifest| async move { + self.render_manifest(&manifest, render_type) + .await + .map_err(move |err| err.wrap(format!("render manifest {digest}"))) + }); + futures.push_back(fut); + } } futures.try_collect().await } @@ -293,7 +295,9 @@ where let layers = crate::resolve::resolve_stack_to_layers_with_repo(&stack, self.repo).await?; let mut manifests = Vec::with_capacity(layers.len()); for layer in layers { - manifests.push(self.repo.read_manifest(*layer.manifest()).await?); + if let Some(manifest_digest) = layer.manifest() { + manifests.push(self.repo.read_manifest(*manifest_digest).await?); + } } let mut manifest = tracking::Manifest::default(); for next in manifests.into_iter() { diff --git a/crates/spfs/src/sync.rs b/crates/spfs/src/sync.rs index f645481d73..125b6fbaac 100644 --- a/crates/spfs/src/sync.rs +++ b/crates/spfs/src/sync.rs @@ -9,6 +9,7 @@ use once_cell::sync::OnceCell; use progress_bar_derive_macro::ProgressBar; use tokio::sync::Semaphore; +use crate::graph::{ExternalData, ExternalDataValue}; use crate::prelude::*; use crate::{encoding, graph, storage, tracking, Error, Result}; @@ -192,7 +193,7 @@ where /// Sync one environment item and any associated data. pub async fn sync_env_item(&self, item: tracking::EnvSpecItem) -> Result { - tracing::debug!(?item, "Syncing item"); + tracing::debug!(?item, "Syncing item: {item}"); self.reporter.visit_env_item(&item); let res = match item { tracking::EnvSpecItem::Digest(digest) => self @@ -316,10 +317,28 @@ where } self.reporter.visit_layer(&layer); - let manifest = self.src.read_manifest(*layer.manifest()).await?; - let result = self.sync_manifest(manifest).await?; + + let manifest_result = if let Some(manifest_digest) = layer.manifest() { + let manifest = self.src.read_manifest(*manifest_digest).await?; + self.sync_manifest(manifest).await? + } else { + SyncManifestResult::Skipped + }; + let external_data_result = if let Some(data) = layer.external_data() { + let external_data: ExternalData = data.into(); + self.sync_external_data(external_data.clone()).await? + } else { + SyncExternalDataResult::Skipped + }; + self.dest.write_object(&layer).await?; - let res = SyncLayerResult::Synced { layer, result }; + let res = SyncLayerResult::Synced { + layer, + results: vec![ + SyncObjectResult::Manifest(manifest_result), + SyncObjectResult::ExternalData(external_data_result), + ], + }; self.reporter.synced_layer(&res); Ok(res) } @@ -360,6 +379,28 @@ where Ok(res) } + async fn sync_external_data( + &self, + external_data: graph::ExternalData<'_>, + ) -> Result { + match external_data.value() { + ExternalDataValue::String(_) => Ok(SyncExternalDataResult::Skipped), + ExternalDataValue::Blob(digest) => { + if !self.processed_digests.insert(digest) { + return Ok(SyncExternalDataResult::Duplicate); + } + self.reporter.visit_external_data(&external_data); + let sync_result = self.sync_digest(digest).await?; + let res = SyncExternalDataResult::Synced { + digest: digest.clone(), + result: Box::new(sync_result), + }; + self.reporter.synced_external_data(&res); + Ok(res) + } + } + } + async fn sync_entry(&self, entry: graph::Entry<'_>) -> Result { if !entry.kind().is_blob() { return Ok(SyncEntryResult::Skipped); @@ -534,6 +575,12 @@ pub trait SyncReporter: Send + Sync { /// Called when a manifest has finished syncing fn synced_manifest(&self, _result: &SyncManifestResult) {} + /// Called when an external_data_layer has been identified to sync + fn visit_external_data(&self, _external_data: &graph::ExternalData) {} + + /// Called when an external_data_layer has finished syncing + fn synced_external_data(&self, _result: &SyncExternalDataResult) {} + /// Called when an entry has been identified to sync fn visit_entry(&self, _entry: &graph::Entry<'_>) {} @@ -743,6 +790,7 @@ pub enum SyncObjectResult { Layer(SyncLayerResult), Blob(SyncBlobResult), Manifest(SyncManifestResult), + ExternalData(SyncExternalDataResult), } impl SyncObjectResult { @@ -758,6 +806,7 @@ impl SyncObjectResult { R::Layer(res) => res.summary(), R::Blob(res) => res.summary(), R::Manifest(res) => res.summary(), + R::ExternalData(res) => res.summary(), } } } @@ -797,7 +846,7 @@ pub enum SyncLayerResult { /// The layer was synced Synced { layer: graph::Layer, - result: SyncManifestResult, + results: Vec, }, } @@ -805,8 +854,8 @@ impl SyncLayerResult { pub fn summary(&self) -> SyncSummary { match self { Self::Skipped | Self::Duplicate => SyncSummary::skipped_one_object(), - Self::Synced { result, .. } => { - let mut summary = result.summary(); + Self::Synced { results, .. } => { + let mut summary = results.iter().map(|r| r.summary()).sum(); summary += SyncSummary::synced_one_object(); summary } @@ -840,6 +889,31 @@ impl SyncManifestResult { } } +#[derive(Debug)] +pub enum SyncExternalDataResult { + /// The external data did not need to be synced + Skipped, + /// The external data was already synced in this session + Duplicate, + /// The external data was stored in a blob and was synced + Synced { + digest: encoding::Digest, + result: Box, + }, +} + +impl SyncExternalDataResult { + pub fn summary(&self) -> SyncSummary { + match self { + Self::Skipped | Self::Duplicate => SyncSummary::default(), + Self::Synced { + digest: _, + result: _, + } => SyncSummary::synced_one_object(), + } + } +} + #[derive(Debug)] pub enum SyncEntryResult { /// The entry did not need to be synced diff --git a/crates/spk-build/src/build/binary.rs b/crates/spk-build/src/build/binary.rs index bbd8e824c5..03bf1ea24a 100644 --- a/crates/spk-build/src/build/binary.rs +++ b/crates/spk-build/src/build/binary.rs @@ -843,8 +843,16 @@ where .with_path_filter(collected_changes.as_slice()) .commit_layer(&mut runtime) .await?; + + let manifest_digest = match layer.manifest() { + Some(d) => d, + None => { + return Err(Error::String("Collected changes became a layer with no manifest. This should not happen during a binary build.".to_string())); + } + }; + let collected_layer = repo - .read_manifest(*layer.manifest()) + .read_manifest(*manifest_digest) .await? .to_tracking_manifest(); let manifests = split_manifest_by_component( diff --git a/crates/spk-build/src/build/binary_test.rs b/crates/spk-build/src/build/binary_test.rs index b9c9690969..c5adf8f229 100644 --- a/crates/spk-build/src/build/binary_test.rs +++ b/crates/spk-build/src/build/binary_test.rs @@ -593,11 +593,17 @@ async fn test_build_package_source_cleanup() { let config = spfs::get_config().unwrap(); let repo = config.get_local_repository().await.unwrap(); let layer = repo.read_layer(digest).await.unwrap(); + + let manifest_digest = match layer.manifest() { + None => panic!("This layer should have a manifest digest!"), + Some(d) => d, + }; let manifest = repo - .read_manifest(*layer.manifest()) + .read_manifest(*manifest_digest) .await .unwrap() .to_tracking_manifest(); + let entry = manifest.get_path(data_path(src_pkg.ident())); assert!( entry.is_none() || entry.unwrap().entries.is_empty(), @@ -683,8 +689,13 @@ async fn test_build_filters_reset_files() { let config = spfs::get_config().unwrap(); let repo = config.get_local_repository().await.unwrap(); let layer = repo.read_layer(digest).await.unwrap(); + + let manifest_digest = match layer.manifest() { + None => panic!("This layer should have a manifest digest!"), + Some(d) => d, + }; let manifest = repo - .read_manifest(*layer.manifest()) + .read_manifest(*manifest_digest) .await .unwrap() .to_tracking_manifest(); diff --git a/crates/spk-cli/cmd-build/src/cmd_build_test.rs b/crates/spk-cli/cmd-build/src/cmd_build_test.rs index 71fdc5cffc..5b6df45ddd 100644 --- a/crates/spk-cli/cmd-build/src/cmd_build_test.rs +++ b/crates/spk-cli/cmd-build/src/cmd_build_test.rs @@ -472,7 +472,11 @@ build: let layer = repo.read_layer(digest).await.unwrap(); let manifest = repo - .read_manifest(*layer.manifest()) + .read_manifest( + *layer + .manifest() + .expect("Layer should have a manifest in this test"), + ) .await .unwrap() .to_tracking_manifest(); @@ -572,7 +576,11 @@ build: let layer = repo.read_layer(digest).await.unwrap(); let manifest = repo - .read_manifest(*layer.manifest()) + .read_manifest( + *layer + .manifest() + .expect("Layer should have a manifest in this test"), + ) .await .unwrap() .to_tracking_manifest(); diff --git a/crates/spk-cli/cmd-du/src/cmd_du.rs b/crates/spk-cli/cmd-du/src/cmd_du.rs index c5b0962921..75b06d1bef 100644 --- a/crates/spk-cli/cmd-du/src/cmd_du.rs +++ b/crates/spk-cli/cmd-du/src/cmd_du.rs @@ -354,8 +354,14 @@ impl Du { } } Enum::Layer(object) => { - item = repo.read_object(*object.manifest()).await?; + let manifest_digest = match object.manifest() { + None => continue, + Some(d) => d, + }; + item = repo.read_object(*manifest_digest).await?; next_iter_objects.push(item); + // TODO: what about external data stored in a blob, + // that isn't counted here? } Enum::Manifest(object) => { let tracking_manifest = object.to_tracking_manifest(); diff --git a/crates/spk-exec/src/exec.rs b/crates/spk-exec/src/exec.rs index 2592f64f05..1cb535088d 100644 --- a/crates/spk-exec/src/exec.rs +++ b/crates/spk-exec/src/exec.rs @@ -48,7 +48,11 @@ impl ResolvedLayers { let object = repo.read_object(resolved_layer.digest).await?; match object.into_enum() { Enum::Layer(obj) => { - match repo.read_object(*obj.manifest()).await?.into_enum() { + let manifest_digest = match obj.manifest() { + None => continue, + Some(m) => m + }; + match repo.read_object(*manifest_digest).await?.into_enum() { Enum::Manifest(obj) => obj, _ => continue, } diff --git a/crates/spk-storage/src/fixtures.rs b/crates/spk-storage/src/fixtures.rs index 94dd6728d8..ff482aae0a 100644 --- a/crates/spk-storage/src/fixtures.rs +++ b/crates/spk-storage/src/fixtures.rs @@ -69,7 +69,7 @@ impl std::ops::Deref for TempRepo { /// Returns an empty spfs layer object for easy testing pub fn empty_layer() -> spfs::graph::Layer { - spfs::graph::Layer::new(spfs::encoding::EMPTY_DIGEST.into()) + spfs::graph::Layer::new(Default::default()) } /// Returns the digest for an empty spfs layer. diff --git a/crates/spk-storage/src/storage/archive.rs b/crates/spk-storage/src/storage/archive.rs index 70c5dcabce..0f10871624 100644 --- a/crates/spk-storage/src/storage/archive.rs +++ b/crates/spk-storage/src/storage/archive.rs @@ -27,7 +27,6 @@ where _ => tracing::warn!("Error trying to remove old file: {:?}", err), } } - filename .parent() .map(|dir| { @@ -35,14 +34,12 @@ where .map_err(|err| Error::DirectoryCreateError(dir.to_owned(), err)) }) .unwrap_or_else(|| Ok(()))?; - // Don't require the "origin" repo to exist here. let (local_repo, remote_repo) = tokio::join!( super::local_repository(), super::remote_repository("origin"), ); let local_repo = local_repo?; - let tar_repo = spfs::storage::tar::TarRepository::create(&filename) .await .map_err(|source| spfs::Error::FailedToOpenRepository { @@ -52,12 +49,10 @@ where // Package exports should not include the top-level directory for // durable runtime upperdir edits. tar_repo.remove_durable_dir().await?; - let mut target_repo = super::SpfsRepository::try_from(( "archive", spfs::storage::RepositoryHandle::from(tar_repo), ))?; - // these are sorted to ensure that the recipe is published // before any build - it's only an error in testing, but still best practice let mut to_transfer = std::collections::BTreeSet::new(); @@ -85,7 +80,6 @@ where } else { to_transfer.insert(pkg.with_build(None)); } - for transfer_pkg in to_transfer.into_iter() { if transfer_pkg.is_embedded() { // Don't attempt to export an embedded package; the stub @@ -166,7 +160,6 @@ where .or(remote_err) .unwrap_or_else(|| Error::PackageNotFound(transfer_pkg))); } - tracing::info!(path=?filename, "building archive"); use std::ops::DerefMut; if let spfs::storage::RepositoryHandle::Tar(tar) = target_repo.deref_mut() { @@ -206,7 +199,6 @@ async fn copy_package( ) -> Result<()> { let spec = src_repo.read_package(pkg).await?; let components = src_repo.read_components(pkg).await?; - tracing::info!(%pkg, "exporting"); let syncer = spfs::Syncer::new(src_repo, dst_repo) .with_reporter(spfs::sync::ConsoleSyncReporter::default()); let desired = components.iter().map(|i| *i.1).collect(); diff --git a/crates/spk-storage/src/storage/runtime.rs b/crates/spk-storage/src/storage/runtime.rs index 75e3e0bb95..bbae56ff34 100644 --- a/crates/spk-storage/src/storage/runtime.rs +++ b/crates/spk-storage/src/storage/runtime.rs @@ -489,12 +489,14 @@ async fn find_layer_by_filename>(path: S) -> Result>( let layers = spfs::resolve_stack_to_layers(&runtime.status.stack, Some(&repo)).await?; for layer in layers.iter().rev() { + let manifest_digest = match layer.manifest() { + None => continue, + Some(d) => d, + }; + let manifest = repo - .read_manifest(*layer.manifest()) + .read_manifest(*manifest_digest) .await? .to_tracking_manifest();