From f20b03bf1e272e64ff7f707101b79e775d962a3b Mon Sep 17 00:00:00 2001 From: David Gilligan-Cook Date: Mon, 24 Jul 2023 11:13:53 -0700 Subject: [PATCH] Adds external key-value string data storage to runtime via ExternalDataLayer objects. Adds --external-data argument to spfs run/shell, --get and --get-all arguments to spfs info, spfs runtime info, and adds support for reading the value from an ExternalDataLayer using spfs read. Signed-off-by: David Gilligan-Cook --- Cargo.lock | 4 + crates/spfs-cli/common/Cargo.toml | 1 + crates/spfs-cli/common/src/args.rs | 54 ++++- crates/spfs-cli/common/src/lib.rs | 2 +- crates/spfs-cli/main/Cargo.toml | 5 + crates/spfs-cli/main/src/cmd_info.rs | 16 ++ crates/spfs-cli/main/src/cmd_read.rs | 10 + crates/spfs-cli/main/src/cmd_run.rs | 105 ++++++++- crates/spfs-cli/main/src/cmd_run_test.rs | 196 ++++++++++++++++ crates/spfs-cli/main/src/cmd_runtime_info.rs | 10 + crates/spfs-cli/main/src/cmd_shell.rs | 5 + crates/spfs-cli/main/src/fixtures.rs | 10 + crates/spfs/src/check.rs | 56 +++++ crates/spfs/src/config.rs | 15 ++ crates/spfs/src/find_path.rs | 3 +- crates/spfs/src/graph/external_data_layer.rs | 119 ++++++++++ .../src/graph/external_data_layer_test.rs | 101 ++++++++ crates/spfs/src/graph/mod.rs | 6 + crates/spfs/src/graph/object.rs | 18 +- crates/spfs/src/io.rs | 1 + crates/spfs/src/proto/conversions.rs | 30 +++ crates/spfs/src/proto/defs/types.proto | 14 ++ crates/spfs/src/resolve.rs | 4 + crates/spfs/src/runtime/mod.rs | 1 + crates/spfs/src/runtime/storage.rs | 193 +++++++++++++++- crates/spfs/src/runtime/storage_test.rs | 218 ++++++++++++++++++ crates/spfs/src/sync.rs | 59 +++++ crates/spk-cli/cmd-du/src/cmd_du.rs | 3 +- 28 files changed, 1248 insertions(+), 11 deletions(-) create mode 100644 crates/spfs-cli/main/src/cmd_run_test.rs create mode 100644 crates/spfs-cli/main/src/fixtures.rs create mode 100644 crates/spfs/src/graph/external_data_layer.rs create mode 100644 crates/spfs/src/graph/external_data_layer_test.rs diff --git a/Cargo.lock b/Cargo.lock index 81e54900d6..ced75cb602 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3396,6 +3396,7 @@ dependencies = [ "sentry-anyhow", "sentry-tracing", "serde_json", + "serde_yaml 0.9.27", "spfs", "strip-ansi-escapes", "syslog-tracing", @@ -3467,10 +3468,13 @@ dependencies = [ "number_prefix", "procfs", "relative-path", + "rstest", "serde_json", + "serde_yaml 0.9.27", "spfs", "spfs-cli-common", "strum", + "tempfile", "tokio", "tokio-stream", "tonic", diff --git a/crates/spfs-cli/common/Cargo.toml b/crates/spfs-cli/common/Cargo.toml index 2139cd2599..f9be003fbd 100644 --- a/crates/spfs-cli/common/Cargo.toml +++ b/crates/spfs-cli/common/Cargo.toml @@ -24,6 +24,7 @@ sentry = { workspace = true, optional = true } sentry-anyhow = { workspace = true, optional = true } sentry-tracing = { workspace = true, optional = true } serde_json = { version = "1.0.57", optional = true } +serde_yaml = { workspace = true } spfs = { path = "../../spfs" } strip-ansi-escapes = { workspace = true, optional = true } syslog-tracing = "0.2.0" diff --git a/crates/spfs-cli/common/src/args.rs b/crates/spfs-cli/common/src/args.rs index 1cda33a885..81f470e406 100644 --- a/crates/spfs-cli/common/src/args.rs +++ b/crates/spfs-cli/common/src/args.rs @@ -7,9 +7,10 @@ use std::panic::catch_unwind; #[cfg(feature = "sentry")] use std::sync::Mutex; -use anyhow::Error; +use anyhow::{Error, Result}; #[cfg(feature = "sentry")] use once_cell::sync::OnceCell; +use spfs::io::Pluralize; use spfs::storage::LocalRepository; use tracing_subscriber::prelude::*; @@ -416,6 +417,57 @@ impl Logging { } } +/// Command line flags for viewing external data in a runtime +#[derive(Debug, Clone, clap::Args)] +pub struct ExternalDataViewing { + /// Output the data value for the given external data key(s) from + /// the active runtime. The each value is output on its own line + /// without its key. + #[clap(long, alias = "external_data")] + pub get: Option>, + + /// Output all the external data keys and values from the active + /// runtime as a yaml dictionary + #[clap(long, alias = "all_external_data")] + pub get_all: bool, +} + +impl ExternalDataViewing { + /// Display external data values based on the command line arguments + pub async fn print_data(&self, runtime: &spfs::runtime::Runtime) -> Result<()> { + if self.get_all { + let data = runtime.all_external_data().await?; + let keys = data + .keys() + .map(ToString::to_string) + .collect::>(); + let num_keys = keys.len(); + tracing::debug!( + "{num_keys} external data {}: {}", + "key".pluralize(num_keys), + keys.join(", ") + ); + println!("{}", serde_yaml::to_string(&data)?); + } else if let Some(keys) = &self.get { + tracing::debug!("--get these keys: {}", keys.join(", ")); + for key in keys.iter() { + match runtime.external_data(key).await? { + Some(value) => { + tracing::debug!("{key} = {value}"); + println!("{value}"); + } + None => { + tracing::warn!("No external data stored under: {key}"); + println!(); + } + } + } + } + + Ok(()) + } +} + /// Trait all spfs cli command parsers must implement to provide the /// name of the spfs command that has been parsed. This method will be /// called when configuring sentry. diff --git a/crates/spfs-cli/common/src/lib.rs b/crates/spfs-cli/common/src/lib.rs index 6353e29b69..8820b61305 100644 --- a/crates/spfs-cli/common/src/lib.rs +++ b/crates/spfs-cli/common/src/lib.rs @@ -10,6 +10,6 @@ pub mod __private { pub use {libc, spfs}; } -pub use args::{capture_if_relevant, CommandName, Logging, Render, Sync}; +pub use args::{capture_if_relevant, CommandName, ExternalDataViewing, Logging, Render, Sync}; #[cfg(feature = "sentry")] pub use args::{configure_sentry, shutdown_sentry}; diff --git a/crates/spfs-cli/main/Cargo.toml b/crates/spfs-cli/main/Cargo.toml index 243303f5ea..27eb3fe7c7 100644 --- a/crates/spfs-cli/main/Cargo.toml +++ b/crates/spfs-cli/main/Cargo.toml @@ -29,6 +29,7 @@ nix = { workspace = true } number_prefix = "*" # we hope to match versions with indicatif relative-path = "1.3" serde_json = { workspace = true } +serde_yaml = { workspace = true } spfs = { path = "../../spfs" } spfs-cli-common = { path = "../common" } strum = { workspace = true, features = ["derive"] } @@ -49,3 +50,7 @@ features = [ "Win32_System_SystemInformation", "Win32_System_Threading", ] + +[dev-dependencies] +rstest = { workspace = true } +tempfile = { workspace = true } diff --git a/crates/spfs-cli/main/src/cmd_info.rs b/crates/spfs-cli/main/src/cmd_info.rs index 611c03952d..6f930d0df9 100644 --- a/crates/spfs-cli/main/src/cmd_info.rs +++ b/crates/spfs-cli/main/src/cmd_info.rs @@ -19,6 +19,9 @@ pub struct CmdInfo { #[clap(flatten)] logging: cli::Logging, + #[clap(flatten)] + external_data: cli::ExternalDataViewing, + /// Lists file sizes in human readable format #[clap(long, short = 'H')] human_readable: bool, @@ -179,6 +182,15 @@ impl CmdInfo { ); } Object::Tree(_) | Object::Mask => println!("{obj:?}"), + Object::ExternalDataLayer(obj) => { + println!( + "{}:\n{}", + self.format_digest(obj.digest()?, repo).await?, + "external data layer:".green() + ); + println!(" {} {}", "key:".bright_blue(), obj.key); + println!(" {} {:?}", "value:".bright_blue(), obj.value); + } } Ok(()) } @@ -187,6 +199,10 @@ impl CmdInfo { async fn print_global_info(&self, repo: &spfs::storage::RepositoryHandle) -> Result<()> { let runtime = spfs::active_runtime().await?; + if self.external_data.get_all || self.external_data.get.is_some() { + return self.external_data.print_data(&runtime).await; + } + println!("{}", "Active Runtime:".green()); println!(" {}: {}", "id:".bright_blue(), runtime.name()); println!( diff --git a/crates/spfs-cli/main/src/cmd_read.rs b/crates/spfs-cli/main/src/cmd_read.rs index b9118a76f6..ab662edff6 100644 --- a/crates/spfs-cli/main/src/cmd_read.rs +++ b/crates/spfs-cli/main/src/cmd_read.rs @@ -32,6 +32,16 @@ impl CmdRead { use spfs::graph::Object; let blob = match item { Object::Blob(blob) => blob, + Object::ExternalDataLayer(external_data_layer) => { + use spfs::graph::ExternalDataValue; + match &external_data_layer.value { + ExternalDataValue::String(value) => { + println!("{value}"); + return Ok(0); + } + ExternalDataValue::Blob(digest) => repo.read_blob(*digest).await?, + } + } _ => { let path = match &self.path { None => { diff --git a/crates/spfs-cli/main/src/cmd_run.rs b/crates/spfs-cli/main/src/cmd_run.rs index ad238e3191..7bbc5c3914 100644 --- a/crates/spfs-cli/main/src/cmd_run.rs +++ b/crates/spfs-cli/main/src/cmd_run.rs @@ -2,15 +2,105 @@ // SPDX-License-Identifier: Apache-2.0 // https://github.com/imageworks/spk +use std::collections::BTreeMap; use std::ffi::OsString; +use std::io; use std::time::Instant; -use anyhow::{Context, Result}; +use anyhow::{anyhow, Context, Result}; use clap::{ArgGroup, Args}; +use spfs::runtime::KeyValuePair; use spfs::storage::FromConfig; use spfs::tracking::EnvSpec; use spfs_cli_common as cli; +#[cfg(test)] +#[path = "./cmd_run_test.rs"] +mod cmd_run_test; +#[cfg(test)] +#[path = "./fixtures.rs"] +mod fixtures; + +#[derive(Args, Clone, Debug)] +pub struct ExternalData { + /// Adds extra external key-value string data to the new runtime. + /// + /// This allows external processes to store arbitrary data in the + /// runtimes they create. This is most useful with durable runtimes. + /// The data can be retrieved by running `spfs runtime info` or + /// `spfs info` with the `--get ` or `--get-all` arguments + /// + /// External data is key/value string pairs separated by either an + /// equals sign or colon (--external-data name=value --external-data + /// other:value). Additionally, many pair of external data can be + /// specified at once in yaml or json format (--external-data '{name: + /// value, other: value}'). + /// + /// External data can also be given in a json or yaml file via the + /// `--external-data-file ` argument. If given, `--external-data` + /// will supersede anything given in external data files. + /// + /// If the same key is used more than once, the last key-value pair + /// will override the earlier values for the same key. + #[clap(long, value_name = "KEY:VALUE")] + pub external_data: Vec, + + /// Specify external extra key-value data from a json or yaml file + /// (see --external-data) + #[clap(long)] + pub external_data_file: Vec, +} + +impl ExternalData { + // Returns a list of external data key-value pairs gathered from all + // the external data related command line arguments. The same keys, + // and values, can appear multiple times in the list if specified + // multiple times in various command line arguments. + pub fn get_data(&self) -> Result> { + let mut data: Vec = Vec::new(); + + for filename in self.external_data_file.iter() { + let reader: Box = + if Ok("-".to_string()) == filename.clone().into_os_string().into_string() { + // Treat '-' as "read from stdin" + Box::new(io::stdin()) + } else { + Box::new(std::fs::File::open(filename).with_context(|| { + format!("Failed to open external data file: {filename:?}") + })?) + }; + let external_data: BTreeMap = serde_yaml::from_reader(reader) + .with_context(|| { + format!("Failed to parse as external data key-value pairs: {filename:?}") + })?; + data.extend(external_data); + } + + for pair in self.external_data.iter() { + let pair = pair.trim(); + if pair.starts_with('{') { + let given: BTreeMap = serde_yaml::from_str(pair) + .context("--external-data value looked like yaml, but could not be parsed")?; + data.extend(given); + continue; + } + + let (name, value) = pair + .split_once('=') + .or_else(|| pair.split_once(':')) + .ok_or_else(|| { + anyhow!( + "Invalid option: -external-data {pair} (should be in the form name=value)" + ) + })?; + + data.push((name.to_string(), value.to_string())); + } + + Ok(data) + } +} + /// Run a program in a configured spfs environment #[derive(Debug, Args)] #[clap(group( @@ -51,6 +141,9 @@ pub struct CmdRun { #[clap(long, value_name = "RUNTIME_NAME")] pub rerun: Option, + #[clap(flatten)] + pub external_data: ExternalData, + /// The tag or id of the desired runtime /// /// Use '-' to request an empty environment @@ -146,6 +239,16 @@ impl CmdRun { ); } + let data = self.external_data.get_data()?; + if !data.is_empty() { + tracing::debug!("with extra external data: {data:?}"); + for (key, value) in data { + runtime + .add_external_data(key, value, config.filesystem.external_data_size_limit) + .await?; + } + } + let start_time = Instant::now(); runtime.config.mount_backend = config.filesystem.backend; runtime.config.secondary_repositories = config.get_secondary_runtime_repositories(); diff --git a/crates/spfs-cli/main/src/cmd_run_test.rs b/crates/spfs-cli/main/src/cmd_run_test.rs new file mode 100644 index 0000000000..52b60af3a1 --- /dev/null +++ b/crates/spfs-cli/main/src/cmd_run_test.rs @@ -0,0 +1,196 @@ +// Copyright (c) Sony Pictures Imageworks, et al. +// SPDX-License-Identifier: Apache-2.0 +// https://github.com/imageworks/spk + +use std::fs; + +use rstest::rstest; + +use super::fixtures::*; +use super::ExternalData; + +// Test - --extra-data field1:value1 --extra-data field2:value2 +// Test - --extra-data field1=value1 --extra-data field2=value2 +// Test - --extra-data field1:value1 --extra-data field2=value2 +// Test - --extra-data field1=value1 --extra-data field2:value2 +// Test - --extra-data '{ field1: value1, field2: value2 }' +#[rstest] +#[case(vec!["field1:value1".to_string(), "field2:value2".to_string()])] +#[case(vec!["field1=value1".to_string(), "field2=value2".to_string()])] +#[case(vec!["field1:value1".to_string(), "field2=value2".to_string()])] +#[case(vec!["field1=value1".to_string(), "field2:value2".to_string()])] +#[case(vec!["{field1: value1, field2: value2}".to_string()])] +fn test_cmd_run_create_external_data(#[case] values: Vec) { + // Setup some data for the key value pairs + let field1 = "field1".to_string(); + let field2 = "field2".to_string(); + let value1 = "value1".to_string(); + let value2 = "value2".to_string(); + + let filenames = Vec::new(); + + // Test - --external-data field1:value1 --external-data field2:value2 + let data = ExternalData { + external_data: values, + external_data_file: filenames, + }; + + let result = data + .get_data() + .expect("unable to parse cmd_run --external-data values"); + assert!(!result.is_empty()); + assert!(result.len() == 2); + assert!(result[0] == (field1, value1)); + assert!(result[1] == (field2, value2)); +} + +#[rstest] +#[should_panic] +#[case(vec!["field1,value1".to_string(), "field2/value2".to_string()])] +#[should_panic] +#[case(vec!["field1 value1".to_string(), "field2 value2".to_string()])] +#[should_panic] +#[case(vec!["{field1: value1, field2: - value2 - value1}".to_string()])] +fn test_cmd_run_create_external_data_invalid(#[case] invalid_values: Vec) { + let filenames = Vec::new(); + + // Test - --external-data with_an_invalid_argument_value + let data = ExternalData { + external_data: invalid_values, + external_data_file: filenames, + }; + + let _result = data + .get_data() + .expect("unable to parse cmd_run --exteranl-data values"); +} + +#[rstest] +fn test_cmd_run_create_external_data_from_file(tmpdir: tempfile::TempDir) { + // Setup some data for the key value pairs + let field1 = "field1".to_string(); + let field2 = "field2".to_string(); + let value1 = "value1".to_string(); + let value2 = "value2".to_string(); + let external_data = format!("{field1}: {value1}\n{field2}: {value2}\n"); + + let filename = tmpdir.path().join("filename.yaml"); + fs::write(filename.clone(), external_data) + .expect("Unable to write external data to file during setup"); + let filenames = vec![filename]; + + let values = Vec::new(); + + // Test - --external-data-file filename.yaml + let data = ExternalData { + external_data: values, + external_data_file: filenames, + }; + + let result = data + .get_data() + .expect("unable to parse cmd_run --external-data values"); + assert!(!result.is_empty()); + assert!(result.len() == 2); + assert!(result[0] == (field1, value1)); + assert!(result[1] == (field2, value2)); +} + +#[rstest] +#[should_panic] +fn test_cmd_run_create_external_data_from_file_not_exist(tmpdir: tempfile::TempDir) { + // Setup a file name that does not exist + let filename = tmpdir.path().join("nosuchfile.yaml"); + let filenames = vec![filename]; + + let values = Vec::new(); + + // Test - --external-data-file nosuchfile.yaml + let data = ExternalData { + external_data: values, + external_data_file: filenames, + }; + + let _result = data + .get_data() + .expect("unable to parse cmd_run --external-data values"); +} + +#[rstest] +#[should_panic] +fn test_cmd_run_create_external_data_from_file_invalid_keyvalues(tmpdir: tempfile::TempDir) { + // Setup some data for the key value pairs + let field1 = "field1".to_string(); + let field2 = "field2".to_string(); + let value1 = "value1".to_string(); + let value2 = "value2".to_string(); + let external_data = format!("{field1}: {value1}\n{field2}:\n - {value2}\n - {value1}\n"); + + let filename = tmpdir.path().join("filename.yaml"); + fs::write(filename.clone(), external_data) + .expect("Unable to write external data to file during setup"); + let filenames = vec![filename]; + + let values = Vec::new(); + + // Test - --external-data-file filename.yaml that contains more than key-value string pairs + let data = ExternalData { + external_data: values, + external_data_file: filenames, + }; + + let _result = data + .get_data() + .expect("unable to parse cmd_run --external-data values"); +} + +#[rstest] +fn test_cmd_run_create_external_data_all(tmpdir: tempfile::TempDir) { + // Setup some data for the key value pairs + let field1 = "field1".to_string(); + let field2 = "field2".to_string(); + let field3 = "field3".to_string(); + let field4 = "field4".to_string(); + + let value1 = "value1".to_string(); + let value2 = "value2".to_string(); + let value3 = "value3".to_string(); + let value4 = "value4".to_string(); + + let values: Vec = vec![ + format!("{field1}:{value1}"), + format!("{field2}={value2}"), + "{field3: value3, field4: value4}".to_string(), + ]; + + let external_data = format!("{field4}: {value4}\n{field3}: {value3}\n"); + + let filename = tmpdir.path().join("filename.yaml"); + fs::write(filename.clone(), external_data) + .expect("Unable to write external data to file during setup"); + let filenames = vec![filename]; + + // Test - --external-data field1:value1 --external-data field2=value2 + // --external-data '{ field1: value1, field2: value2 }' + // --external-data-file filename.yaml + let data = ExternalData { + external_data: values, + external_data_file: filenames, + }; + + let result = data + .get_data() + .expect("unable to parse cmd_run --external-data values"); + + assert!(!result.is_empty()); + assert!(result.len() == 6); + // from --external-data-file filename.taml + assert!(result[0] == (field3.clone(), value3.clone())); + assert!(result[1] == (field4.clone(), value4.clone())); + // from --external-data field1:value1 external-data field2:value2 + assert!(result[2] == (field1, value1)); + assert!(result[3] == (field2, value2)); + // from --external-data '{field3: value3, field4: value4}' + assert!(result[4] == (field3, value3)); + assert!(result[5] == (field4, value4)); +} diff --git a/crates/spfs-cli/main/src/cmd_runtime_info.rs b/crates/spfs-cli/main/src/cmd_runtime_info.rs index 8c30dd0bd1..c74310260d 100644 --- a/crates/spfs-cli/main/src/cmd_runtime_info.rs +++ b/crates/spfs-cli/main/src/cmd_runtime_info.rs @@ -4,6 +4,7 @@ use anyhow::Result; use clap::Args; +use spfs_cli_common as cli; /// Show the complete state of a runtime #[derive(Debug, Args)] @@ -12,6 +13,9 @@ pub struct CmdRuntimeInfo { #[clap(short, long)] remote: Option, + #[clap(flatten)] + external_data: cli::ExternalDataViewing, + /// The name/id of the runtime to remove #[clap(env = "SPFS_RUNTIME")] name: String, @@ -28,6 +32,12 @@ impl CmdRuntimeInfo { }; let runtime = runtime_storage.read_runtime(&self.name).await?; + + if self.external_data.get_all || self.external_data.get.is_some() { + self.external_data.print_data(&runtime).await?; + return Ok(0); + } + serde_json::to_writer_pretty(std::io::stdout(), runtime.data())?; println!(); // the trailing new line is nice for interactive shells diff --git a/crates/spfs-cli/main/src/cmd_shell.rs b/crates/spfs-cli/main/src/cmd_shell.rs index d31047bfef..94b36dbae2 100644 --- a/crates/spfs-cli/main/src/cmd_shell.rs +++ b/crates/spfs-cli/main/src/cmd_shell.rs @@ -7,6 +7,7 @@ use clap::{ArgGroup, Args}; use spfs_cli_common as cli; use super::cmd_run; +use super::cmd_run::ExternalData; /// Enter a subshell in a configured spfs environment #[derive(Debug, Args)] @@ -48,6 +49,9 @@ pub struct CmdShell { #[clap(short, long, env = "SPFS_KEEP_RUNTIME")] pub keep_runtime: bool, + #[clap(flatten)] + pub external_data: ExternalData, + /// The tag or id of the desired runtime /// /// Use '-' or nothing to request an empty environment @@ -67,6 +71,7 @@ impl CmdShell { runtime_name: self.runtime_name.clone(), reference: self.reference.clone(), keep_runtime: self.keep_runtime, + external_data: self.external_data.clone(), command: Default::default(), }; run_cmd.run(config).await diff --git a/crates/spfs-cli/main/src/fixtures.rs b/crates/spfs-cli/main/src/fixtures.rs new file mode 100644 index 0000000000..96eee98c40 --- /dev/null +++ b/crates/spfs-cli/main/src/fixtures.rs @@ -0,0 +1,10 @@ +use rstest::fixture; +use tempfile::TempDir; + +#[fixture] +pub fn tmpdir() -> TempDir { + tempfile::Builder::new() + .prefix("spfs-test-") + .tempdir() + .expect("failed to create dir for test") +} diff --git a/crates/spfs/src/check.rs b/crates/spfs/src/check.rs index b20ef3ff5d..ab521e7245 100644 --- a/crates/spfs/src/check.rs +++ b/crates/spfs/src/check.rs @@ -12,6 +12,7 @@ use once_cell::sync::OnceCell; use progress_bar_derive_macro::ProgressBar; use tokio::sync::Semaphore; +use crate::graph::ExternalDataValue; use crate::prelude::*; use crate::sync::{SyncObjectResult, SyncPayloadResult, SyncPolicy}; use crate::{encoding, graph, storage, tracking, Error, Result}; @@ -302,6 +303,9 @@ where Object::Manifest(obj) => CheckObjectResult::Manifest(self.check_manifest(obj).await?), Object::Tree(obj) => CheckObjectResult::Tree(obj), Object::Mask => CheckObjectResult::Mask, + Object::ExternalDataLayer(obj) => { + CheckObjectResult::ExternalDataLayer(self.check_external_data_layer(obj).await?) + } }; self.reporter.checked_object(&res); Ok(res) @@ -359,6 +363,28 @@ where Ok(res) } + /// Validate that the identified external data layer's value exists. + /// + /// To also check if the external data layer object exists, use [`Self::check_digest`] + pub async fn check_external_data_layer( + &self, + external_data_layer: graph::ExternalDataLayer, + ) -> Result { + let res = match external_data_layer.value { + ExternalDataValue::String(_v) => CheckExternalDataLayerResult::Skipped, + ExternalDataValue::Blob(d) => { + let blob = self.repo.read_blob(d).await?; + let result = unsafe { self.check_blob(blob).await? }; + CheckExternalDataLayerResult::Checked { + external_data_layer, + result, + repaired: false, + } + } + }; + Ok(res) + } + /// Validate that the identified blob has its payload. /// /// To also check if the blob object exists, use [`Self::check_digest`] @@ -784,6 +810,7 @@ pub enum CheckObjectResult { Manifest(CheckManifestResult), Tree(graph::Tree), Mask, + ExternalDataLayer(CheckExternalDataLayerResult), } impl CheckObjectResult { @@ -800,6 +827,7 @@ impl CheckObjectResult { CheckObjectResult::Manifest(r) => r.set_repaired(), CheckObjectResult::Tree(_) => (), CheckObjectResult::Mask => (), + CheckObjectResult::ExternalDataLayer(r) => r.set_repaired(), } } @@ -817,6 +845,7 @@ impl CheckObjectResult { Blob(res) => res.summary(), Manifest(res) => res.summary(), Mask | Tree(_) => CheckSummary::default(), + ExternalDataLayer(res) => res.summary(), } } } @@ -910,6 +939,33 @@ impl CheckEntryResult { } } +#[derive(Debug)] +pub enum CheckExternalDataLayerResult { + /// The entry was not one that needed checking + Skipped, + /// The entry was checked + Checked { + external_data_layer: graph::ExternalDataLayer, + result: CheckBlobResult, + repaired: bool, + }, +} + +impl CheckExternalDataLayerResult { + fn set_repaired(&mut self) { + if let Self::Checked { repaired, .. } = self { + *repaired = true; + } + } + + pub fn summary(&self) -> CheckSummary { + match self { + Self::Skipped => CheckSummary::default(), + Self::Checked { result, .. } => result.summary(), + } + } +} + #[derive(Debug)] pub enum CheckBlobResult { /// The blob was already checked in this session diff --git a/crates/spfs/src/config.rs b/crates/spfs/src/config.rs index 1610a1dd54..eca189e2d9 100644 --- a/crates/spfs/src/config.rs +++ b/crates/spfs/src/config.rs @@ -11,6 +11,7 @@ use serde::{Deserialize, Serialize}; use storage::{FromConfig, FromUrl}; use tokio_stream::StreamExt; +use crate::graph::SPFS_EXTERNAL_DATA_LAYER_MAX_STRING_VALUE_SIZE; use crate::{runtime, storage, tracking, Result}; #[cfg(test)] @@ -248,6 +249,13 @@ pub struct Filesystem { /// systems that can perform read-through lookups, such as FUSE. #[serde(default = "Filesystem::default_secondary_repositories")] pub secondary_repositories: Vec, + + /// The size limit for external data before the data is stored in + /// a sepearate blob payload referenced in an external data + /// layer. Data values smaller than or equal to this are stored + /// directly in the external data layer. + #[serde(default = "Filesystem::default_external_data_size_limit")] + pub external_data_size_limit: usize, } impl Filesystem { @@ -256,6 +264,13 @@ impl Filesystem { pub fn default_secondary_repositories() -> Vec { vec![String::from("origin")] } + + /// The default size limit for a piece of external data before it + /// is stored in a separate blob payload from the external data + /// layer that contains it + pub fn default_external_data_size_limit() -> usize { + SPFS_EXTERNAL_DATA_LAYER_MAX_STRING_VALUE_SIZE + } } /// Configuration options for the fuse filesystem process diff --git a/crates/spfs/src/find_path.rs b/crates/spfs/src/find_path.rs index e83cc61727..cee5a5823b 100644 --- a/crates/spfs/src/find_path.rs +++ b/crates/spfs/src/find_path.rs @@ -34,6 +34,7 @@ impl ObjectPathEntry { Object::Blob(obj) => Ok(obj.digest()), Object::Tree(obj) => obj.digest(), Object::Mask => Err(Error::String("spfs Mask object has no digest".to_string())), + Object::ExternalDataLayer(_) => obj.digest(), }, ObjectPathEntry::FilePath(entry) => Ok(entry.object), } @@ -120,7 +121,7 @@ async fn find_path_in_spfs_item( } } - Object::Blob(_) | Object::Tree(_) | Object::Mask => { + Object::Blob(_) | Object::Tree(_) | Object::Mask | Object::ExternalDataLayer(_) => { // These are not examined here when searching for the // filepath because the filepath will be found by walking // Manifest objects. diff --git a/crates/spfs/src/graph/external_data_layer.rs b/crates/spfs/src/graph/external_data_layer.rs new file mode 100644 index 0000000000..000cc948ff --- /dev/null +++ b/crates/spfs/src/graph/external_data_layer.rs @@ -0,0 +1,119 @@ +// Copyright (c) Sony Pictures Imageworks, et al. +// SPDX-License-Identifier: Apache-2.0 +// https://github.com/imageworks/spk + +use std::io::BufRead; + +use crate::{encoding, Error, Result}; + +#[cfg(test)] +#[path = "./external_data_layer_test.rs"] +mod external_data_layer_test; + +// Default size limit for string valus stored directly in an external +// data layer. Values larger than this are stored ina blob payload +// that is referenced from the external data layer. +pub const SPFS_EXTERNAL_DATA_LAYER_MAX_STRING_VALUE_SIZE: usize = 16 * 1024; + +/// The ways external data values are stored. +#[derive(Debug, Clone, Eq, PartialEq)] +pub enum ExternalDataValue { + /// In the ExternalDataLayer object as a string + String(String), + /// In a separate blob payload pointed at by the digest + Blob(encoding::Digest), +} + +impl ExternalDataValue { + pub fn is_string(&self) -> bool { + matches!(self, Self::String(_)) + } + pub fn is_blob(&self) -> bool { + matches!(self, Self::Blob(_)) + } +} + +impl encoding::Encodable for ExternalDataValue { + type Error = Error; + + fn encode(&self, mut writer: &mut impl std::io::Write) -> Result<()> { + match self { + ExternalDataValue::String(v) => { + encoding::write_uint(&mut writer, 1)?; + Ok(encoding::write_string(writer, v.as_str())?) + } + ExternalDataValue::Blob(v) => { + encoding::write_uint(&mut writer, 2)?; + Ok(encoding::write_digest(writer, v)?) + } + } + } +} + +impl encoding::Decodable for ExternalDataValue { + fn decode(mut reader: &mut impl BufRead) -> Result { + let value_id = encoding::read_uint(&mut reader)?; + match value_id { + 1 => Ok(ExternalDataValue::String(encoding::read_string( + &mut reader, + )?)), + 2 => Ok(ExternalDataValue::Blob(encoding::read_digest(&mut reader)?)), + n => Err(format!( + "Cannot read ExternalDataLayer object: unknown value enum {n}. Should be 1 or 2" + ) + .into()), + } + } +} + +/// ExternalDataLayers represent a key-value pair of data from an +/// external program stored for injection into a spfs runtime. +#[derive(Debug, Eq, PartialEq, Clone)] +pub struct ExternalDataLayer { + pub key: String, + pub value: ExternalDataValue, +} + +impl ExternalDataLayer { + pub fn new(key: String, value: ExternalDataValue) -> Self { + ExternalDataLayer { key, value } + } + + /// Return the child object of this one in the object DG. + pub fn child_objects(&self) -> Vec { + // TODO: should this return the digest if the value is a blob + // digest, and return an empty list when it's a string? + Vec::new() + } + + pub fn size(&self) -> u64 { + match &self.value { + ExternalDataValue::String(v) => v.len() as u64, + ExternalDataValue::Blob(d) => { + // TODO: this is the digest size and it should load + // the payload and get the size of it, or store the + // size in the external data layer? + d.len() as u64 + } + } + } +} + +impl encoding::Encodable for ExternalDataLayer { + type Error = Error; + + fn encode(&self, mut writer: &mut impl std::io::Write) -> Result<()> { + encoding::write_string(&mut writer, self.key.as_str())?; + self.value.encode(&mut writer)?; + Ok(()) + } +} + +impl encoding::Decodable for ExternalDataLayer { + fn decode(mut reader: &mut impl BufRead) -> Result { + Ok(ExternalDataLayer { + key: encoding::read_string(&mut reader)?, + value: ExternalDataValue::decode(&mut reader)?, + }) + } +} diff --git a/crates/spfs/src/graph/external_data_layer_test.rs b/crates/spfs/src/graph/external_data_layer_test.rs new file mode 100644 index 0000000000..8f2cdc3b34 --- /dev/null +++ b/crates/spfs/src/graph/external_data_layer_test.rs @@ -0,0 +1,101 @@ +// Copyright (c) Sony Pictures Imageworks, et al. +// SPDX-License-Identifier: Apache-2.0 +// https://github.com/imageworks/spk + +use rstest::rstest; +use spfs_encoding::Digest; + +use super::{ExternalDataLayer, ExternalDataValue}; +use crate::encoding; +use crate::encoding::prelude::*; + +#[rstest] +fn test_externaldatavalue_string() { + let value = String::from("value"); + let string_value = ExternalDataValue::String(value); + + assert!(string_value.is_string()); + assert!(!string_value.is_blob()); +} + +#[rstest] +fn test_externaldatavalue_string_encoding() { + let value = String::from("value"); + let string_value = ExternalDataValue::String(value); + + let mut stream = Vec::new(); + string_value.encode(&mut stream).unwrap(); + let actual = ExternalDataValue::decode(&mut stream.as_slice()).unwrap(); + assert_eq!(actual.digest().unwrap(), string_value.digest().unwrap()) +} + +#[rstest] +fn test_externaldatavalue_blob() { + let digest: Digest = encoding::EMPTY_DIGEST.into(); + + let blob_value = ExternalDataValue::Blob(digest); + + assert!(blob_value.is_blob()); + assert!(!blob_value.is_string()); +} + +#[rstest] +fn test_externaldatavalue_blob_encoding() { + let digest: Digest = encoding::EMPTY_DIGEST.into(); + + let blob_value = ExternalDataValue::Blob(digest); + + let mut stream = Vec::new(); + blob_value.encode(&mut stream).unwrap(); + let actual = ExternalDataValue::decode(&mut stream.as_slice()).unwrap(); + assert_eq!(actual.digest().unwrap(), blob_value.digest().unwrap()) +} + +#[rstest] +fn test_externaldatalayer_string() { + let key = String::from("key"); + let value = String::from("value"); + + let data = ExternalDataLayer { + key: key.clone(), + value: ExternalDataValue::String(value.clone()), + }; + + assert_eq!(data.key, key); + match data.value { + ExternalDataValue::String(v) => assert_eq!(v, value), + ExternalDataValue::Blob(_) => panic!("Expected a ExternalDataValue::String value"), + } +} + +#[rstest] +fn test_externaldatalayer_string_encoding() { + let key = String::from("key"); + let value = String::from("value"); + + let data = ExternalDataLayer { + key: key.clone(), + value: ExternalDataValue::String(value.clone()), + }; + + let mut stream = Vec::new(); + data.encode(&mut stream).unwrap(); + let actual = ExternalDataLayer::decode(&mut stream.as_slice()).unwrap(); + assert_eq!(actual.digest().unwrap(), data.digest().unwrap()) +} + +#[rstest] +fn test_externaldatalayer_blob() { + let key = String::from("key"); + let digest: Digest = encoding::EMPTY_DIGEST.into(); + + let data = ExternalDataLayer { + key: key.clone(), + value: ExternalDataValue::Blob(digest), + }; + + let mut stream = Vec::new(); + data.encode(&mut stream).unwrap(); + let actual = ExternalDataLayer::decode(&mut stream.as_slice()).unwrap(); + assert_eq!(actual.digest().unwrap(), data.digest().unwrap()) +} diff --git a/crates/spfs/src/graph/mod.rs b/crates/spfs/src/graph/mod.rs index 1fada34a99..e48372b42f 100644 --- a/crates/spfs/src/graph/mod.rs +++ b/crates/spfs/src/graph/mod.rs @@ -7,6 +7,7 @@ mod blob; mod database; mod entry; +mod external_data_layer; mod layer; mod manifest; mod object; @@ -22,6 +23,11 @@ pub use database::{ DigestSearchCriteria, }; pub use entry::Entry; +pub use external_data_layer::{ + ExternalDataLayer, + ExternalDataValue, + SPFS_EXTERNAL_DATA_LAYER_MAX_STRING_VALUE_SIZE, +}; pub use layer::Layer; pub use manifest::Manifest; pub use object::{Object, ObjectKind}; diff --git a/crates/spfs/src/graph/object.rs b/crates/spfs/src/graph/object.rs index 5031c0fa2f..6fe9c80217 100644 --- a/crates/spfs/src/graph/object.rs +++ b/crates/spfs/src/graph/object.rs @@ -6,7 +6,7 @@ use std::io::BufRead; use strum::Display; -use super::{Blob, Layer, Manifest, Platform, Tree}; +use super::{Blob, ExternalDataLayer, Layer, Manifest, Platform, Tree}; use crate::storage::RepositoryHandle; use crate::{encoding, Error}; @@ -18,6 +18,7 @@ pub enum Object { Tree(Tree), Blob(Blob), Mask, + ExternalDataLayer(ExternalDataLayer), } impl Object { @@ -29,6 +30,7 @@ impl Object { Self::Tree(tree) => tree.entries.iter().map(|e| e.object).collect(), Self::Blob(_blob) => Vec::new(), Self::Mask => Vec::new(), + Self::ExternalDataLayer(_external_data) => Vec::new(), } } @@ -41,6 +43,7 @@ impl Object { Self::Platform(_) => ObjectKind::Platform, Self::Tree(_) => ObjectKind::Tree, Self::Mask => ObjectKind::Mask, + Self::ExternalDataLayer(_) => ObjectKind::ExternalDataLayer, } } @@ -80,6 +83,7 @@ impl Object { } Object::Blob(object) => total_size += object.size, Object::Mask => (), + Object::ExternalDataLayer(object) => total_size += object.size(), } } items_to_process = std::mem::take(&mut next_iter_objects); @@ -113,6 +117,11 @@ impl From for Object { Self::Blob(blob) } } +impl From for Object { + fn from(external_data_layer: ExternalDataLayer) -> Self { + Self::ExternalDataLayer(external_data_layer) + } +} /// Identifies the kind of object this is for the purposes of encoding #[derive(Debug)] @@ -123,6 +132,7 @@ pub enum ObjectKind { Platform = 3, Tree = 4, Mask = 5, + ExternalDataLayer = 6, } impl ObjectKind { @@ -134,6 +144,7 @@ impl ObjectKind { 3 => Some(Self::Platform), 4 => Some(Self::Tree), 5 => Some(Self::Mask), + 6 => Some(Self::ExternalDataLayer), _ => None, } } @@ -152,6 +163,7 @@ impl encoding::Encodable for Object { Self::Tree(obj) => obj.digest(), Self::Blob(obj) => Ok(obj.digest()), Self::Mask => Ok(encoding::EMPTY_DIGEST.into()), + Self::ExternalDataLayer(obj) => obj.digest(), } } @@ -165,6 +177,7 @@ impl encoding::Encodable for Object { Self::Platform(obj) => obj.encode(&mut writer), Self::Tree(obj) => obj.encode(&mut writer), Self::Mask => Ok(()), + Self::ExternalDataLayer(obj) => obj.encode(&mut writer), } } } @@ -180,6 +193,9 @@ impl encoding::Decodable for Object { Some(ObjectKind::Platform) => Ok(Self::Platform(Platform::decode(&mut reader)?)), Some(ObjectKind::Tree) => Ok(Self::Tree(Tree::decode(&mut reader)?)), Some(ObjectKind::Mask) => Ok(Self::Mask), + Some(ObjectKind::ExternalDataLayer) => Ok(Self::ExternalDataLayer( + ExternalDataLayer::decode(&mut reader)?, + )), None => Err(format!("Cannot read object: unknown object kind {type_id}").into()), } } diff --git a/crates/spfs/src/io.rs b/crates/spfs/src/io.rs index 6f396285df..9fc4e0cf01 100644 --- a/crates/spfs/src/io.rs +++ b/crates/spfs/src/io.rs @@ -166,6 +166,7 @@ pub async fn pretty_print_filepath( Object::Blob(_) => "blob", Object::Tree(_) => "tree", Object::Mask => "mask", + Object::ExternalDataLayer(_) => "external data", }; println!( diff --git a/crates/spfs/src/proto/conversions.rs b/crates/spfs/src/proto/conversions.rs index 1d1b081b49..b9257ce29e 100644 --- a/crates/spfs/src/proto/conversions.rs +++ b/crates/spfs/src/proto/conversions.rs @@ -146,6 +146,7 @@ impl From<&graph::Object> for super::Object { graph::Object::Tree(o) => Kind::Tree(o.into()), graph::Object::Blob(o) => Kind::Blob(o.into()), graph::Object::Mask => Kind::Mask(true), + graph::Object::ExternalDataLayer(o) => Kind::ExternalDataLayer(o.into()), }), } } @@ -171,6 +172,7 @@ impl TryFrom for graph::Object { Some(Kind::Tree(o)) => Ok(graph::Object::Tree(o.try_into()?)), Some(Kind::Blob(o)) => Ok(graph::Object::Blob(o.try_into()?)), Some(Kind::Mask(_)) => Ok(graph::Object::Mask), + Some(Kind::ExternalDataLayer(o)) => Ok(graph::Object::ExternalDataLayer(o.try_into()?)), None => Err(Error::String( "Expected non-empty object kind in rpc message".to_string(), )), @@ -353,6 +355,34 @@ impl TryFrom for storage::EntryType { } } +impl From<&graph::ExternalDataLayer> for super::ExternalDataLayer { + fn from(source: &graph::ExternalDataLayer) -> Self { + use super::external_data_layer::Value; + Self { + key: source.key.clone(), + value: match &source.value { + graph::ExternalDataValue::String(s) => Some(Value::Data(s.clone())), + graph::ExternalDataValue::Blob(d) => Some(Value::Digest(d.into())), + }, + } + } +} + +impl TryFrom for graph::ExternalDataLayer { + type Error = Error; + fn try_from(source: super::ExternalDataLayer) -> Result { + use super::external_data_layer::Value; + Ok(Self { + key: source.key.clone(), + value: match source.value { + Some(Value::Data(s)) => graph::ExternalDataValue::String(s.clone()), + Some(Value::Digest(d)) => graph::ExternalDataValue::Blob(convert_digest(Some(d))?), + None => graph::ExternalDataValue::String(Default::default()), + }, + }) + } +} + impl From for super::DigestSearchCriteria { fn from(search_criteria: graph::DigestSearchCriteria) -> Self { Self { diff --git a/crates/spfs/src/proto/defs/types.proto b/crates/spfs/src/proto/defs/types.proto index 834b500038..6fa82465d6 100644 --- a/crates/spfs/src/proto/defs/types.proto +++ b/crates/spfs/src/proto/defs/types.proto @@ -21,6 +21,7 @@ message Object { Tree tree = 4; Blob blob = 5; bool mask = 6; + ExternalDataLayer external_data_layer = 7; } } @@ -59,3 +60,16 @@ message Blob { Digest payload = 1; uint64 size = 2; } + +message ExternalDataLayer { + string key = 1; + oneof value { + string data = 2; + Digest digest = 3; + } +} + +enum ExternalDataValue { + STRING = 0; + DIGEST = 1; +} \ No newline at end of file diff --git a/crates/spfs/src/resolve.rs b/crates/spfs/src/resolve.rs index a010daa26a..51ef52afa6 100644 --- a/crates/spfs/src/resolve.rs +++ b/crates/spfs/src/resolve.rs @@ -434,6 +434,10 @@ where graph::Object::Manifest(manifest) => { layers.push(graph::Layer::new(manifest.digest().unwrap())) } + graph::Object::ExternalDataLayer(_external_data_layer) => { + // TODO: turn into a file named for the key and + // containing the value. For now this is a placeholder. + } obj => { return Err(format!( "Cannot resolve object into a mountable filesystem layer: {:?}", diff --git a/crates/spfs/src/runtime/mod.rs b/crates/spfs/src/runtime/mod.rs index 071a9e4b10..8c59f71d45 100644 --- a/crates/spfs/src/runtime/mod.rs +++ b/crates/spfs/src/runtime/mod.rs @@ -24,6 +24,7 @@ pub use storage::{ BindMount, Config, Data, + KeyValuePair, LiveLayer, LiveLayerFile, MountBackend, diff --git a/crates/spfs/src/runtime/storage.rs b/crates/spfs/src/runtime/storage.rs index 2cd6931b8c..500c418b42 100644 --- a/crates/spfs/src/runtime/storage.rs +++ b/crates/spfs/src/runtime/storage.rs @@ -4,7 +4,7 @@ //! Definition and persistent storage of runtimes. -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::env::temp_dir; use std::fmt::Display; use std::fs::OpenOptions; @@ -27,8 +27,9 @@ use tokio::io::AsyncReadExt; use super::startup_ps; #[cfg(unix)] use super::{startup_csh, startup_sh}; -use crate::encoding::{self, Encodable}; +use crate::encoding::{Digest, Encodable}; use crate::env::SPFS_DIR_PREFIX; +use crate::graph::{ExternalDataLayer, ExternalDataValue}; use crate::prelude::*; use crate::storage::fs::DURABLE_EDITS_DIR; use crate::storage::RepositoryHandle; @@ -52,6 +53,9 @@ const TRANSIENT: bool = false; const LIVE_LAYER_FILE_SUFFIX_YAML: &str = ".spfs.yaml"; const DEFAULT_LIVE_LAYER_FILENAME: &str = "layer.spfs.yaml"; +/// Data type for pairs of external data keys and values +pub type KeyValuePair = (String, String); + /// Information about the source of a runtime #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] pub struct Author { @@ -74,11 +78,11 @@ impl Default for Author { #[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq)] pub struct Status { /// The set of layers that are being used in this runtime - pub stack: Vec, + pub stack: Vec, /// Additional layers that were created automatically due to the stack /// being too large. #[serde(default, skip_serializing_if = "HashSet::is_empty")] - pub(crate) flattened_layers: HashSet, + pub(crate) flattened_layers: HashSet, /// Whether or not this runtime is editable /// /// An editable runtime is mounted with working directories @@ -685,6 +689,73 @@ impl Runtime { self.data.is_durable() } + /// Storage an arbitrary key-value string pair in the runtime + pub async fn add_external_data( + &mut self, + key: String, + value: String, + size_limit: usize, + ) -> Result<()> { + tracing::debug!( + "about to insert external data key: {} => value: {} [len: {} > {}]", + key, + value, + value.len(), + size_limit + ); + + let external_data_layer = if value.len() <= size_limit { + tracing::debug!("about to store value directly in the external data layer"); + ExternalDataLayer { + key, + value: ExternalDataValue::String(value), + } + } else { + tracing::debug!("about to store value in a created blob"); + let digest = self.storage.create_blob_for_string(value).await?; + tracing::debug!("created a blob under {digest}"); + ExternalDataLayer { + key, + value: ExternalDataValue::Blob(digest), + } + }; + + let external_data_layer_digest = external_data_layer.digest()?; + tracing::debug!("external data layer digest: {}", external_data_layer_digest); + self.storage + .save_external_data_layer(external_data_layer) + .await?; + + // The new external data is added to the top of the runtime's stack + self.push_digest(external_data_layer_digest); + Ok(()) + } + + /// Get the string data stored as extra data under the given key. + pub async fn external_data(&self, key: &str) -> Result> { + for digest in self.status.stack.iter() { + if let Some(s) = self.storage.get_external_data(digest, key).await? { + return Ok(Some(s)); + } + } + Ok(None) + } + + /// Get the string data stored as extra data under the given key. + pub async fn all_external_data(&self) -> Result> { + let mut data: BTreeMap = BTreeMap::new(); + // Reversing the stack order ensures that external data layers + // using the same key override each other correctly (highest + // layer, i.e. last one added, takes overrides the others). + for digest in self.status.stack.iter().rev() { + let pairs = self.storage.get_external_key_value_pairs(digest).await?; + for (key, value) in pairs { + data.insert(key, value); + } + } + Ok(data) + } + /// Reset parts of the runtime's state so it can be reused in /// another process run. pub async fn reinit_for_reuse_and_save_to_storage(&mut self) -> Result<()> { @@ -872,7 +943,7 @@ impl Runtime { /// This will update the configuration of the runtime, /// and change the overlayfs options, but not save the runtime or /// update any currently running environment. - pub fn push_digest(&mut self, digest: encoding::Digest) { + pub fn push_digest(&mut self, digest: Digest) { let mut new_stack = Vec::with_capacity(self.status.stack.len() + 1); new_stack.push(digest); for existing in self.status.stack.drain(..) { @@ -1155,6 +1226,118 @@ impl Storage { OwnedRuntime::upgrade_as_owner(rt).await } + /// Create a new blob payload to hold the given string + pub(crate) async fn create_blob_for_string(&self, payload: String) -> Result { + self.inner + .commit_blob(Box::pin(std::io::Cursor::new(payload.into_bytes()))) + .await + } + + /// Create a external data layer + pub(crate) async fn save_external_data_layer( + &self, + external_data_layer: ExternalDataLayer, + ) -> Result<()> { + self.inner + .write_object(&graph::Object::ExternalDataLayer(external_data_layer)) + .await + } + + /// If the given digest refers to an object that is, or contains, + /// an external data layer that uses the given key, then return + /// data value from the external data layer. + pub(crate) async fn get_external_data( + &self, + digest: &Digest, + key: &str, + ) -> Result> { + let mut digests_to_process: Vec = vec![*digest]; + + while !digests_to_process.is_empty() { + let mut next_iter_digests: Vec = Vec::new(); + for digest in digests_to_process.iter() { + match self.inner.read_ref(digest.to_string().as_str()).await? { + graph::Object::Platform(platform) => { + for reference in platform.stack.iter() { + next_iter_digests.push(*reference); + } + } + graph::Object::ExternalDataLayer(external_data_layer) => { + if external_data_layer.key == key { + let value = self.get_external_data_value(&external_data_layer).await?; + return Ok(Some(value)); + } + } + _ => { + // None of the other objects contain ExternalDataLayers + } + } + } + digests_to_process = std::mem::take(&mut next_iter_digests); + } + Ok(None) + } + + /// If the digest refers to an object that is, or could contain, + /// an external data layer, then return all keys and values from + /// the external data layers that can be found starting from it. + pub(crate) async fn get_external_key_value_pairs( + &self, + digest: &Digest, + ) -> Result> { + let mut key_value_pairs: Vec = Vec::new(); + + let mut digests_to_process: Vec = vec![*digest]; + while !digests_to_process.is_empty() { + let mut next_iter_digests: Vec = Vec::new(); + for digest in digests_to_process.iter() { + match self.inner.read_ref(digest.to_string().as_str()).await? { + graph::Object::Platform(platform) => { + for reference in platform.stack.iter() { + next_iter_digests.push(*reference); + } + } + graph::Object::ExternalDataLayer(external_data_layer) => { + let key = external_data_layer.key.clone(); + let value = self.get_external_data_value(&external_data_layer).await?; + key_value_pairs.push((key, value)); + } + _ => { + // None of the other objects contain ExternalDataLayers + } + } + } + digests_to_process = std::mem::take(&mut next_iter_digests); + } + Ok(key_value_pairs) + } + + /// Get the String value from the given external data layer + async fn get_external_data_value( + &self, + external_data_layer: &ExternalDataLayer, + ) -> Result { + let data = match &external_data_layer.value { + ExternalDataValue::String(s) => s.clone(), + ExternalDataValue::Blob(digest) => { + let blob = self.inner.read_blob(*digest).await?; + let (mut payload, filename) = self.inner.open_payload(blob.digest()).await?; + let mut writer: Vec = vec![]; + tokio::io::copy(&mut payload, &mut writer) + .await + .map_err(|err| { + Error::StorageReadError( + "copy of external data layer payload to string buffer", + filename, + err, + ) + })?; + String::from_utf8(writer).map_err(|err| Error::String(err.to_string()))? + } + }; + Ok(data) + } + pub async fn durable_path(&self, name: String) -> Result { match &*self.inner { RepositoryHandle::FS(repo) => { diff --git a/crates/spfs/src/runtime/storage_test.rs b/crates/spfs/src/runtime/storage_test.rs index b915d2666b..7c49124262 100644 --- a/crates/spfs/src/runtime/storage_test.rs +++ b/crates/spfs/src/runtime/storage_test.rs @@ -10,10 +10,12 @@ use std::str::FromStr; use futures::TryStreamExt; use rstest::rstest; +use spfs_encoding::Encodable; use super::{makedirs_with_perms, Data, Storage}; use crate::encoding; use crate::fixtures::*; +use crate::graph::{ExternalDataLayer, ExternalDataValue, Object, Platform}; use crate::runtime::storage::{LiveLayerApiVersion, LiveLayerContents}; use crate::runtime::{BindMount, LiveLayer, LiveLayerFile}; @@ -171,6 +173,222 @@ async fn test_storage_create_runtime(tmpdir: tempfile::TempDir) { .is_err()); } +#[rstest] +#[tokio::test] +async fn test_storage_runtime_with_external_data(tmpdir: tempfile::TempDir) { + let root = tmpdir.path().to_string_lossy().to_string(); + let repo = crate::storage::RepositoryHandle::from( + crate::storage::fs::FsRepository::create(root) + .await + .unwrap(), + ); + let storage = Storage::new(repo); + let limit: usize = 16 * 1024; + + let keep_runtime = false; + let live_layers = Vec::new(); + let mut runtime = storage + .create_named_runtime("test-with-external-data", keep_runtime, live_layers) + .await + .expect("failed to create runtime in storage"); + + // Test - insert new data + let key = "somefield".to_string(); + let value = "somevalue".to_string(); + assert!(runtime + .add_external_data(key.clone(), value, limit) + .await + .is_ok()); + + // Test - insert over existing data + let value2 = "someothervalue".to_string(); + assert!(runtime + .add_external_data(key.clone(), value2.clone(), limit) + .await + .is_ok()); + + // Test - retrieve data + let result = runtime.external_data(&key).await.unwrap(); + assert!(result.is_some()); + + assert!(value2 == *result.unwrap()); +} + +#[rstest] +#[tokio::test] +async fn test_storage_runtime_with_nested_external_data(tmpdir: tempfile::TempDir) { + // Setup the objects needed for the runtime used in the test + let root = tmpdir.path().to_string_lossy().to_string(); + let repo = crate::storage::RepositoryHandle::from( + crate::storage::fs::FsRepository::create(root) + .await + .unwrap(), + ); + + // make an external data layer + let key = "somefield".to_string(); + let value = "somevalue".to_string(); + let external_data_layer = ExternalDataLayer { + key: key.clone(), + value: ExternalDataValue::String(value.clone()), + }; + repo.write_object(&Object::ExternalDataLayer(external_data_layer.clone())) + .await + .unwrap(); + + // make a platform that contains the external data layer + let layers: Vec = vec![external_data_layer.digest().unwrap()]; + let platform = Platform::new(layers.iter()).unwrap(); + repo.write_object(&Object::Platform(platform.clone())) + .await + .unwrap(); + + // put the platform into a runtime + let storage = Storage::new(repo); + let keep_runtime = false; + let live_layers = Vec::new(); + let mut runtime = storage + .create_named_runtime("test-with-external-data-nested", keep_runtime, live_layers) + .await + .expect("failed to create runtime in storage"); + runtime.push_digest(platform.digest().unwrap()); + + // Test - retrieve the data even though it is nested inside a + // platform object in the runtime. + let result = runtime.external_data(&key).await.unwrap(); + assert!(result.is_some()); + + assert!(value == *result.unwrap()); +} + +#[rstest] +#[tokio::test] +async fn test_storage_runtime_with_external_data_all(tmpdir: tempfile::TempDir) { + let root = tmpdir.path().to_string_lossy().to_string(); + let repo = crate::storage::RepositoryHandle::from( + crate::storage::fs::FsRepository::create(root) + .await + .unwrap(), + ); + let storage = Storage::new(repo); + let limit: usize = 16 * 1024; + + let keep_runtime = false; + let live_layers = Vec::new(); + let mut runtime = storage + .create_named_runtime("test-with-external-data-all", keep_runtime, live_layers) + .await + .expect("failed to create runtime in storage"); + + // Test - insert two distinct data values + let key = "somefield".to_string(); + let value = "somevalue".to_string(); + assert!(runtime + .add_external_data(key.clone(), value.clone(), limit) + .await + .is_ok()); + + let key2 = "somefield2".to_string(); + let value2 = "somevalue2".to_string(); + assert!(runtime + .add_external_data(key2.clone(), value2.clone(), limit) + .await + .is_ok()); + + // Test - get all the data back out + let result = runtime.all_external_data().await.unwrap(); + assert!(result.len() == 2); + for (expected_key, expected_value) in [(key, value), (key2, value2)].iter() { + assert!(result.get(expected_key).is_some()); + match result.get(expected_key) { + Some(v) => { + assert!(v == expected_value); + } + None => panic!("Value missing for {expected_key} when getting all external data"), + } + } +} + +#[rstest] +#[tokio::test] +async fn test_storage_runtime_with_nested_external_data_all(tmpdir: tempfile::TempDir) { + // setup the objects needed for the runtime used in the test + let root = tmpdir.path().to_string_lossy().to_string(); + let repo = crate::storage::RepositoryHandle::from( + crate::storage::fs::FsRepository::create(root) + .await + .unwrap(), + ); + + // make two distinct data values + let key = "somefield".to_string(); + let value = "somevalue".to_string(); + let external_data_layer = ExternalDataLayer { + key: key.clone(), + value: ExternalDataValue::String(value.clone()), + }; + repo.write_object(&Object::ExternalDataLayer(external_data_layer.clone())) + .await + .unwrap(); + + let key2 = "somefield2".to_string(); + let value2 = "somevalue2".to_string(); + let external_data_layer2 = ExternalDataLayer { + key: key2.clone(), + value: ExternalDataValue::String(value2.clone()), + }; + repo.write_object(&Object::ExternalDataLayer(external_data_layer2.clone())) + .await + .unwrap(); + + // make a platform with one external data layer + let layers: Vec = vec![external_data_layer.digest().unwrap()]; + let platform = Platform::new(layers.iter()).unwrap(); + repo.write_object(&Object::Platform(platform.clone())) + .await + .unwrap(); + + // make another platform with the first platform and the other + // external data layer. this second platform is in the runtime + let layers2: Vec = vec![ + platform.digest().unwrap(), + external_data_layer2.digest().unwrap(), + ]; + let platform2 = Platform::new(layers2.iter()).unwrap(); + repo.write_object(&Object::Platform(platform2.clone())) + .await + .unwrap(); + + // finally set up the runtime + let storage = Storage::new(repo); + + let keep_runtime = false; + let live_layers = Vec::new(); + let mut runtime = storage + .create_named_runtime( + "test-with-external-data-all-nested", + keep_runtime, + live_layers, + ) + .await + .expect("failed to create runtime in storage"); + runtime.push_digest(platform2.digest().unwrap()); + + // Test - get all the data back out even thought it is nested at + // different levels in different platform objects in the runtime + let result = runtime.all_external_data().await.unwrap(); + assert!(result.len() == 2); + for (expected_key, expected_value) in [(key, value), (key2, value2)].iter() { + assert!(result.get(expected_key).is_some()); + match result.get(expected_key) { + Some(v) => { + assert!(v == expected_value); + } + None => panic!("Value missing for {expected_key} when getting all external data"), + } + } +} + #[rstest] #[tokio::test] async fn test_storage_remove_runtime(tmpdir: tempfile::TempDir) { diff --git a/crates/spfs/src/sync.rs b/crates/spfs/src/sync.rs index f561580cb7..b97fe15049 100644 --- a/crates/spfs/src/sync.rs +++ b/crates/spfs/src/sync.rs @@ -277,6 +277,9 @@ where Object::Manifest(obj) => SyncObjectResult::Manifest(self.sync_manifest(obj).await?), Object::Tree(obj) => SyncObjectResult::Tree(obj), Object::Mask => SyncObjectResult::Mask, + Object::ExternalDataLayer(obj) => { + SyncObjectResult::ExternalDataLayer(self.sync_external_data_layer(obj).await?) + } }; self.reporter.synced_object(&res); Ok(res) @@ -383,6 +386,29 @@ where Ok(res) } + async fn sync_external_data_layer( + &self, + external_data_layer: graph::ExternalDataLayer, + ) -> Result { + let digest = external_data_layer.digest()?; + if !self.processed_digests.insert(digest) { + return Ok(SyncExternalDataLayerResult::Duplicate); + } + self.reporter + .visit_external_data_layer(&external_data_layer); + self.dest + .write_object(&graph::Object::ExternalDataLayer( + external_data_layer.clone(), + )) + .await?; + let res = SyncExternalDataLayerResult::Synced { + external_data_layer, + result: Ok(()), + }; + self.reporter.synced_external_data_layer(&res); + Ok(res) + } + /// Sync the identified blob to the destination repository. pub async fn sync_blob(&self, blob: graph::Blob) -> Result { self.sync_blob_with_perms_opt(blob, None).await @@ -557,6 +583,12 @@ pub trait SyncReporter: Send + Sync { /// Called when a payload has finished syncing fn synced_payload(&self, _result: &SyncPayloadResult) {} + + /// Called when an external_data_layer has been identified to sync + fn visit_external_data_layer(&self, _external_data_layer: &graph::ExternalDataLayer) {} + + /// Called when an external_data_layer has finished syncing + fn synced_external_data_layer(&self, _result: &SyncExternalDataLayerResult) {} } #[derive(Default)] @@ -751,6 +783,7 @@ pub enum SyncObjectResult { Manifest(SyncManifestResult), Tree(graph::Tree), Mask, + ExternalDataLayer(SyncExternalDataLayerResult), } impl SyncObjectResult { @@ -767,6 +800,7 @@ impl SyncObjectResult { Blob(res) => res.summary(), Manifest(res) => res.summary(), Mask | Tree(_) => SyncSummary::default(), + ExternalDataLayer(res) => res.summary(), } } } @@ -875,6 +909,31 @@ impl SyncEntryResult { } } +#[derive(Debug)] +pub enum SyncExternalDataLayerResult { + /// The external data layer did not need to be synced + Skipped, + /// The external data layer was already synced in this session + Duplicate, + /// The external data layer was synced + Synced { + external_data_layer: graph::ExternalDataLayer, + result: Result<()>, + }, +} + +impl SyncExternalDataLayerResult { + pub fn summary(&self) -> SyncSummary { + match self { + Self::Skipped | Self::Duplicate => SyncSummary::default(), + Self::Synced { + external_data_layer: _, + result: _, + } => SyncSummary::synced_one_object(), + } + } +} + #[derive(Debug)] pub enum SyncBlobResult { /// The blob did not need to be synced diff --git a/crates/spk-cli/cmd-du/src/cmd_du.rs b/crates/spk-cli/cmd-du/src/cmd_du.rs index 1f926be170..66c01f7cd9 100644 --- a/crates/spk-cli/cmd-du/src/cmd_du.rs +++ b/crates/spk-cli/cmd-du/src/cmd_du.rs @@ -369,7 +369,8 @@ impl Du { } Object::Tree(_) => self.output.warn(format_args!("Tree object cannot have disk usage generated")), Object::Blob(_) => self.output.warn(format_args!("Blob object cannot have disk usage generated")), - Object::Mask => () + Object::Mask => (), + Object::ExternalDataLayer(_object) => self.output.warn(format_args!("ExternalDataLayer object does not have disk usage generated")) } } items_to_process = std::mem::take(&mut next_iter_objects);