diff --git a/client/src/command/push.rs b/client/src/command/push.rs index f1ea560..ba7945a 100644 --- a/client/src/command/push.rs +++ b/client/src/command/push.rs @@ -1,17 +1,16 @@ -use std::io; -use std::io::BufRead; use std::path::PathBuf; use std::sync::Arc; use anyhow::{anyhow, Result}; use clap::Parser; use indicatif::MultiProgress; +use tokio::io::{self, AsyncBufReadExt, BufReader}; use crate::api::ApiClient; -use crate::cache::CacheRef; +use crate::cache::{CacheName, CacheRef, ServerName}; use crate::cli::Opts; use crate::config::Config; -use crate::push::{PushConfig, Pusher}; +use crate::push::{PushConfig, PushSessionConfig, Pusher}; use attic::nix_store::NixStore; /// Push closures to a binary cache. @@ -47,6 +46,80 @@ pub struct Push { force_preamble: bool, } +struct PushContext { + store: Arc, + cache_name: CacheName, + server_name: ServerName, + pusher: Pusher, + no_closure: bool, + ignore_upstream_cache_filter: bool, +} + +impl PushContext { + async fn push_static(self, paths: &Vec) -> Result<()> { + let roots = paths + .clone() + .into_iter() + .map(|p| self.store.follow_store_path(p)) + .collect::, _>>()?; + + let plan = self + .pusher + .plan(roots, self.no_closure, self.ignore_upstream_cache_filter) + .await?; + + if plan.store_path_map.is_empty() { + if plan.num_all_paths == 0 { + eprintln!("🤷 Nothing selected."); + } else { + eprintln!( + "✅ All done! ({num_already_cached} already cached, {num_upstream} in upstream)", + num_already_cached = plan.num_already_cached, + num_upstream = plan.num_upstream, + ); + } + + return Ok(()); + } else { + eprintln!("⚙️ Pushing {num_missing_paths} paths to \"{cache}\" on \"{server}\" ({num_already_cached} already cached, {num_upstream} in upstream)...", + cache = self.cache_name.as_str(), + server = self.server_name.as_str(), + num_missing_paths = plan.store_path_map.len(), + num_already_cached = plan.num_already_cached, + num_upstream = plan.num_upstream, + ); + } + + for (_, path_info) in plan.store_path_map { + self.pusher.queue(path_info).await?; + } + + let results = self.pusher.wait().await; + results.into_values().collect::>>()?; + + Ok(()) + } + + async fn push_stdin(self) -> Result<()> { + let session = self.pusher.into_push_session(PushSessionConfig { + no_closure: self.no_closure, + ignore_upstream_cache_filter: self.ignore_upstream_cache_filter, + }); + + let stdin = BufReader::new(io::stdin()); + let mut lines = stdin.lines(); + while let Some(line) = lines.next_line().await? { + let path = self.store.follow_store_path(line)?; + session.queue_many(vec![path])?; + } + + let results = session.wait().await?; + results.into_values().collect::>>()?; + + Ok(()) + } +} + pub async fn run(opts: Opts) -> Result<()> { let sub = opts.command.as_push().unwrap(); if sub.jobs == 0 { @@ -56,27 +129,13 @@ pub async fn run(opts: Opts) -> Result<()> { let config = Config::load()?; let store = Arc::new(NixStore::connect()?); - let roots = if sub.stdin { - io::stdin() - .lock() - .lines() - .flatten() - .map(|p| store.follow_store_path(p.trim())) - .collect::, _>>()? - } else { - sub.paths - .clone() - .into_iter() - .map(|p| store.follow_store_path(p)) - .collect::, _>>()? - }; - let (server_name, server, cache) = config.resolve_cache(&sub.cache)?; + let (server_name, server, cache_name) = config.resolve_cache(&sub.cache)?; let mut api = ApiClient::from_server_config(server.clone())?; // Confirm remote cache validity, query cache config - let cache_config = api.get_cache_config(cache).await?; + let cache_config = api.get_cache_config(cache_name).await?; if let Some(api_endpoint) = &cache_config.api_endpoint { // Use delegated API endpoint @@ -90,39 +149,29 @@ pub async fn run(opts: Opts) -> Result<()> { let mp = MultiProgress::new(); - let pusher = Pusher::new(store, api, cache.to_owned(), cache_config, mp, push_config); - let plan = pusher - .plan(roots, sub.no_closure, sub.ignore_upstream_cache_filter) - .await?; - - if plan.store_path_map.is_empty() { - if plan.num_all_paths == 0 { - eprintln!("🤷 Nothing selected."); - } else { - eprintln!( - "✅ All done! ({num_already_cached} already cached, {num_upstream} in upstream)", - num_already_cached = plan.num_already_cached, - num_upstream = plan.num_upstream, - ); - } + let pusher = Pusher::new( + store.clone(), + api, + cache_name.to_owned(), + cache_config, + mp, + push_config, + ); + + let push_ctx = PushContext { + store, + cache_name: cache_name.clone(), + server_name: server_name.clone(), + pusher, + no_closure: sub.no_closure, + ignore_upstream_cache_filter: sub.ignore_upstream_cache_filter, + }; - return Ok(()); + if sub.stdin { + push_ctx.push_stdin().await?; } else { - eprintln!("⚙️ Pushing {num_missing_paths} paths to \"{cache}\" on \"{server}\" ({num_already_cached} already cached, {num_upstream} in upstream)...", - cache = cache.as_str(), - server = server_name.as_str(), - num_missing_paths = plan.store_path_map.len(), - num_already_cached = plan.num_already_cached, - num_upstream = plan.num_upstream, - ); - } - - for (_, path_info) in plan.store_path_map { - pusher.queue(path_info).await?; + push_ctx.push_static(&sub.paths).await?; } - let results = pusher.wait().await; - results.into_values().collect::>>()?; - Ok(()) }