Skip to content

Commit

Permalink
feat(push): filter derivation by regex
Browse files Browse the repository at this point in the history
  • Loading branch information
shimunn committed Jul 17, 2023
1 parent 4902d57 commit 43026c6
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 5 deletions.
12 changes: 11 additions & 1 deletion client/src/command/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::sync::Arc;
use anyhow::{anyhow, Result};
use clap::Parser;
use indicatif::MultiProgress;
use regex::Regex;

use crate::api::ApiClient;
use crate::cache::CacheRef;
Expand All @@ -24,6 +25,10 @@ pub struct Push {
/// The store paths to push.
paths: Vec<PathBuf>,

/// Derivation names to be filted out.
#[clap(long)]
filter: Option<Regex>,

/// Push the specified paths only and do not compute closures.
#[clap(long)]
no_closure: bool,
Expand Down Expand Up @@ -78,7 +83,12 @@ pub async fn run(opts: Opts) -> Result<()> {

let pusher = Pusher::new(store, api, cache.to_owned(), cache_config, mp, push_config);
let plan = pusher
.plan(roots, sub.no_closure, sub.ignore_upstream_cache_filter)
.plan(
roots,
sub.no_closure,
sub.ignore_upstream_cache_filter,
sub.filter.clone(),
)
.await?;

if plan.store_path_map.is_empty() {
Expand Down
18 changes: 14 additions & 4 deletions client/src/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use bytes::Bytes;
use futures::future::join_all;
use futures::stream::{Stream, TryStreamExt};
use indicatif::{HumanBytes, MultiProgress, ProgressBar, ProgressState, ProgressStyle};
use regex::Regex;
use tokio::sync::Mutex;
use tokio::task::{spawn, JoinHandle};
use tokio::time;
Expand Down Expand Up @@ -192,6 +193,7 @@ impl Pusher {
roots: Vec<StorePath>,
no_closure: bool,
ignore_upstream_filter: bool,
filter: Option<Regex>,
) -> Result<PushPlan> {
PushPlan::plan(
self.store.clone(),
Expand All @@ -201,6 +203,7 @@ impl Pusher {
roots,
no_closure,
ignore_upstream_filter,
filter,
)
.await
}
Expand Down Expand Up @@ -336,6 +339,7 @@ impl PushSession {
roots_vec,
config.no_closure,
config.ignore_upstream_cache_filter,
None,
)
.await?;

Expand Down Expand Up @@ -375,28 +379,34 @@ impl PushPlan {
roots: Vec<StorePath>,
no_closure: bool,
ignore_upstream_filter: bool,
filter: Option<Regex>,
) -> Result<Self> {
// Compute closure
let closure = if no_closure {
roots
} else {
store
.compute_fs_closure_multi(roots, false, false, false)
.compute_fs_closure_multi(roots, false, false, false, None)
.await?
};

let mut store_path_map: HashMap<StorePathHash, ValidPathInfo> = {
let futures = closure
.iter()
.map(|path| {
.flat_map(|path| {
let store = store.clone();
let path = path.clone();
if let Some(ref filter) = filter {
if filter.is_match(&path.name()) {
return None;
}
}
let path_hash = path.to_hash();

async move {
Some(async move {
let path_info = store.query_path_info(path).await?;
Ok((path_hash, path_info))
}
})
})
.collect::<Vec<_>>();

Expand Down

0 comments on commit 43026c6

Please sign in to comment.