Skip to content

Commit

Permalink
feat(push): filter derivation by regex
Browse files Browse the repository at this point in the history
  • Loading branch information
shimunn committed Jul 17, 2023
1 parent 4902d57 commit b03aa99
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 11 deletions.
17 changes: 12 additions & 5 deletions attic/src/nix_store/nix_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ use std::os::unix::ffi::OsStrExt;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use regex::Regex;
use tokio::task::spawn_blocking;

use super::bindings::{open_nix_store, AsyncWriteAdapter, FfiNixStore};
use super::{to_base_name, StorePath, ValidPathInfo};
use crate::hash::Hash;
use crate::error::AtticResult;
use crate::hash::Hash;

/// High-level wrapper for the Unix Domain Socket Nix Store.
pub struct NixStore {
Expand Down Expand Up @@ -156,6 +157,7 @@ impl NixStore {
flip_directions: bool,
include_outputs: bool,
include_derivers: bool,
filter: Option<Regex>,
) -> AtticResult<Vec<StorePath>> {
let inner = self.inner.clone();

Expand All @@ -174,16 +176,20 @@ impl NixStore {

Ok(cxx_vector
.iter()
.map(|s| {
.filter_map(|s| {
let osstr = OsStr::from_bytes(s.as_bytes());
let pb = PathBuf::from(osstr);

// Safety: The C++ implementation already checks the StorePath
// for correct format (which also implies valid UTF-8)
#[allow(unsafe_code)]
unsafe {
StorePath::from_base_name_unchecked(pb)
let store_path = unsafe { StorePath::from_base_name_unchecked(pb) };
if let Some(ref filter) = filter {
if filter.is_match(&store_path.name()) {
return None;
}
}
Some(store_path)
})
.collect())
})
Expand All @@ -201,7 +207,8 @@ impl NixStore {

// FIXME: Make this more ergonomic and efficient
let nar_size = c_path_info.pin_mut().nar_size();
let nar_sha256_hash: [u8; 32] = c_path_info.pin_mut().nar_sha256_hash().try_into().unwrap();
let nar_sha256_hash: [u8; 32] =
c_path_info.pin_mut().nar_sha256_hash().try_into().unwrap();
let references = c_path_info
.pin_mut()
.references()
Expand Down
2 changes: 1 addition & 1 deletion attic/src/nix_store/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ fn test_compute_fs_closure_multi() {
];

let actual: HashSet<StorePath> = store
.compute_fs_closure_multi(paths, false, false, false)
.compute_fs_closure_multi(paths, false, false, false, None)
.await
.expect("Could not compute closure")
.into_iter()
Expand Down
12 changes: 11 additions & 1 deletion client/src/command/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::sync::Arc;
use anyhow::{anyhow, Result};
use clap::Parser;
use indicatif::MultiProgress;
use regex::Regex;

use crate::api::ApiClient;
use crate::cache::CacheRef;
Expand All @@ -24,6 +25,10 @@ pub struct Push {
/// The store paths to push.
paths: Vec<PathBuf>,

/// Derivation names to be filted out.
#[clap(long)]
filter: Option<Regex>,

/// Push the specified paths only and do not compute closures.
#[clap(long)]
no_closure: bool,
Expand Down Expand Up @@ -78,7 +83,12 @@ pub async fn run(opts: Opts) -> Result<()> {

let pusher = Pusher::new(store, api, cache.to_owned(), cache_config, mp, push_config);
let plan = pusher
.plan(roots, sub.no_closure, sub.ignore_upstream_cache_filter)
.plan(
roots,
sub.no_closure,
sub.ignore_upstream_cache_filter,
sub.filter.clone(),
)
.await?;

if plan.store_path_map.is_empty() {
Expand Down
18 changes: 14 additions & 4 deletions client/src/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use bytes::Bytes;
use futures::future::join_all;
use futures::stream::{Stream, TryStreamExt};
use indicatif::{HumanBytes, MultiProgress, ProgressBar, ProgressState, ProgressStyle};
use regex::Regex;
use tokio::sync::Mutex;
use tokio::task::{spawn, JoinHandle};
use tokio::time;
Expand Down Expand Up @@ -192,6 +193,7 @@ impl Pusher {
roots: Vec<StorePath>,
no_closure: bool,
ignore_upstream_filter: bool,
filter: Option<Regex>,
) -> Result<PushPlan> {
PushPlan::plan(
self.store.clone(),
Expand All @@ -201,6 +203,7 @@ impl Pusher {
roots,
no_closure,
ignore_upstream_filter,
filter,
)
.await
}
Expand Down Expand Up @@ -336,6 +339,7 @@ impl PushSession {
roots_vec,
config.no_closure,
config.ignore_upstream_cache_filter,
None,
)
.await?;

Expand Down Expand Up @@ -375,28 +379,34 @@ impl PushPlan {
roots: Vec<StorePath>,
no_closure: bool,
ignore_upstream_filter: bool,
filter: Option<Regex>,
) -> Result<Self> {
// Compute closure
let closure = if no_closure {
roots
} else {
store
.compute_fs_closure_multi(roots, false, false, false)
.compute_fs_closure_multi(roots, false, false, false, None)
.await?
};

let mut store_path_map: HashMap<StorePathHash, ValidPathInfo> = {
let futures = closure
.iter()
.map(|path| {
.flat_map(|path| {
let store = store.clone();
let path = path.clone();
if let Some(ref filter) = filter {
if filter.is_match(&path.name()) {
return None;
}
}
let path_hash = path.to_hash();

async move {
Some(async move {
let path_info = store.query_path_info(path).await?;
Ok((path_hash, path_info))
}
})
})
.collect::<Vec<_>>();

Expand Down

0 comments on commit b03aa99

Please sign in to comment.