diff --git a/client/src/command/push.rs b/client/src/command/push.rs index 9e1e795..c93d22d 100644 --- a/client/src/command/push.rs +++ b/client/src/command/push.rs @@ -4,12 +4,13 @@ use std::sync::Arc; use anyhow::{anyhow, Result}; use clap::Parser; use indicatif::MultiProgress; +use tokio::io::{self, AsyncBufReadExt, BufReader}; use crate::api::ApiClient; -use crate::cache::CacheRef; +use crate::cache::{CacheName, CacheRef, ServerName}; use crate::cli::Opts; use crate::config::Config; -use crate::push::{PushConfig, Pusher}; +use crate::push::{PushConfig, PushSessionConfig, Pusher}; use attic::nix_store::NixStore; /// Push closures to a binary cache. @@ -24,6 +25,10 @@ pub struct Push { /// The store paths to push. paths: Vec, + /// Read paths from the standard input. + #[clap(long)] + stdin: bool, + /// Push the specified paths only and do not compute closures. #[clap(long)] no_closure: bool, @@ -41,6 +46,79 @@ pub struct Push { force_preamble: bool, } +struct PushContext { + store: Arc, + cache_name: CacheName, + server_name: ServerName, + pusher: Pusher, + no_closure: bool, + ignore_upstream_cache_filter: bool, +} + +impl PushContext { + async fn push_static(self, paths: Vec) -> Result<()> { + let roots = paths + .into_iter() + .map(|p| self.store.follow_store_path(p)) + .collect::, _>>()?; + + let plan = self + .pusher + .plan(roots, self.no_closure, self.ignore_upstream_cache_filter) + .await?; + + if plan.store_path_map.is_empty() { + if plan.num_all_paths == 0 { + eprintln!("🤷 Nothing selected."); + } else { + eprintln!( + "✅ All done! ({num_already_cached} already cached, {num_upstream} in upstream)", + num_already_cached = plan.num_already_cached, + num_upstream = plan.num_upstream, + ); + } + + return Ok(()); + } else { + eprintln!("⚙️ Pushing {num_missing_paths} paths to \"{cache}\" on \"{server}\" ({num_already_cached} already cached, {num_upstream} in upstream)...", + cache = self.cache_name.as_str(), + server = self.server_name.as_str(), + num_missing_paths = plan.store_path_map.len(), + num_already_cached = plan.num_already_cached, + num_upstream = plan.num_upstream, + ); + } + + for (_, path_info) in plan.store_path_map { + self.pusher.queue(path_info).await?; + } + + let results = self.pusher.wait().await; + results.into_values().collect::>>()?; + + Ok(()) + } + + async fn push_stdin(self) -> Result<()> { + let session = self.pusher.into_push_session(PushSessionConfig { + no_closure: self.no_closure, + ignore_upstream_cache_filter: self.ignore_upstream_cache_filter, + }); + + let stdin = BufReader::new(io::stdin()); + let mut lines = stdin.lines(); + while let Some(line) = lines.next_line().await? { + let path = self.store.follow_store_path(line)?; + session.queue_many(vec![path])?; + } + + let results = session.wait().await?; + results.into_values().collect::>>()?; + + Ok(()) + } +} + pub async fn run(opts: Opts) -> Result<()> { let sub = opts.command.as_push().unwrap(); if sub.jobs == 0 { @@ -50,19 +128,13 @@ pub async fn run(opts: Opts) -> Result<()> { let config = Config::load()?; let store = Arc::new(NixStore::connect()?); - let roots = sub - .paths - .clone() - .into_iter() - .map(|p| store.follow_store_path(p)) - .collect::, _>>()?; - let (server_name, server, cache) = config.resolve_cache(&sub.cache)?; + let (server_name, server, cache_name) = config.resolve_cache(&sub.cache)?; let mut api = ApiClient::from_server_config(server.clone())?; // Confirm remote cache validity, query cache config - let cache_config = api.get_cache_config(cache).await?; + let cache_config = api.get_cache_config(cache_name).await?; if let Some(api_endpoint) = &cache_config.api_endpoint { // Use delegated API endpoint @@ -76,39 +148,29 @@ pub async fn run(opts: Opts) -> Result<()> { let mp = MultiProgress::new(); - let pusher = Pusher::new(store, api, cache.to_owned(), cache_config, mp, push_config); - let plan = pusher - .plan(roots, sub.no_closure, sub.ignore_upstream_cache_filter) - .await?; - - if plan.store_path_map.is_empty() { - if plan.num_all_paths == 0 { - eprintln!("🤷 Nothing selected."); - } else { - eprintln!( - "✅ All done! ({num_already_cached} already cached, {num_upstream} in upstream)", - num_already_cached = plan.num_already_cached, - num_upstream = plan.num_upstream, - ); - } + let pusher = Pusher::new( + store.clone(), + api, + cache_name.to_owned(), + cache_config, + mp, + push_config, + ); + + let push_ctx = PushContext { + store, + cache_name: cache_name.clone(), + server_name: server_name.clone(), + pusher, + no_closure: sub.no_closure, + ignore_upstream_cache_filter: sub.ignore_upstream_cache_filter, + }; - return Ok(()); + if sub.stdin { + push_ctx.push_stdin().await?; } else { - eprintln!("⚙️ Pushing {num_missing_paths} paths to \"{cache}\" on \"{server}\" ({num_already_cached} already cached, {num_upstream} in upstream)...", - cache = cache.as_str(), - server = server_name.as_str(), - num_missing_paths = plan.store_path_map.len(), - num_already_cached = plan.num_already_cached, - num_upstream = plan.num_upstream, - ); - } - - for (_, path_info) in plan.store_path_map { - pusher.queue(path_info).await?; + push_ctx.push_static(sub.paths.clone()).await?; } - let results = pusher.wait().await; - results.into_values().collect::>>()?; - Ok(()) } diff --git a/client/src/push.rs b/client/src/push.rs index 1b7194f..309bd4b 100644 --- a/client/src/push.rs +++ b/client/src/push.rs @@ -28,7 +28,7 @@ use bytes::Bytes; use futures::future::join_all; use futures::stream::{Stream, TryStreamExt}; use indicatif::{HumanBytes, MultiProgress, ProgressBar, ProgressState, ProgressStyle}; -use tokio::sync::Mutex; +use tokio::sync::{mpsc, Mutex}; use tokio::task::{spawn, JoinHandle}; use tokio::time; @@ -100,11 +100,22 @@ pub struct Pusher { /// seconds since the last path is queued or it's been 10 seconds in total. pub struct PushSession { /// Sender to the batching future. - sender: channel::Sender>, + sender: channel::Sender, + + /// Receiver of results. + result_receiver: mpsc::Receiver>>>, +} + +enum SessionQueueCommand { + Paths(Vec), + Flush, + Terminate, } enum SessionQueuePoll { Paths(Vec), + Flush, + Terminate, Closed, TimedOut, } @@ -255,36 +266,36 @@ impl Pusher { impl PushSession { pub fn with_pusher(pusher: Pusher, config: PushSessionConfig) -> Self { let (sender, receiver) = channel::unbounded(); + let (result_sender, result_receiver) = mpsc::channel(1); let known_paths_mutex = Arc::new(Mutex::new(HashSet::new())); - // FIXME spawn(async move { - let pusher = Arc::new(pusher); - loop { - if let Err(e) = Self::worker( - pusher.clone(), - config, - known_paths_mutex.clone(), - receiver.clone(), - ) - .await - { - eprintln!("Worker exited: {:?}", e); - } else { - break; - } + if let Err(e) = Self::worker( + pusher, + config, + known_paths_mutex.clone(), + receiver.clone(), + result_sender.clone(), + ) + .await + { + let _ = result_sender.send(Err(e)).await; } }); - Self { sender } + Self { + sender, + result_receiver, + } } async fn worker( - pusher: Arc, + pusher: Pusher, config: PushSessionConfig, known_paths_mutex: Arc>>, - receiver: channel::Receiver>, + receiver: channel::Receiver, + result_sender: mpsc::Sender>>>, ) -> Result<()> { let mut roots = HashSet::new(); @@ -296,7 +307,9 @@ impl PushSession { loop { let poll = tokio::select! { r = receiver.recv() => match r { - Ok(paths) => SessionQueuePoll::Paths(paths), + Ok(SessionQueueCommand::Paths(paths)) => SessionQueuePoll::Paths(paths), + Ok(SessionQueueCommand::Flush) => SessionQueuePoll::Flush, + Ok(SessionQueueCommand::Terminate) => SessionQueuePoll::Terminate, _ => SessionQueuePoll::Closed, }, _ = time::sleep(Duration::from_secs(2)) => SessionQueuePoll::TimedOut, @@ -306,10 +319,10 @@ impl PushSession { SessionQueuePoll::Paths(store_paths) => { roots.extend(store_paths.into_iter()); } - SessionQueuePoll::Closed => { + SessionQueuePoll::Closed | SessionQueuePoll::Terminate => { break true; } - SessionQueuePoll::TimedOut => { + SessionQueuePoll::Flush | SessionQueuePoll::TimedOut => { break false; } } @@ -352,15 +365,37 @@ impl PushSession { drop(known_paths); if done { + let result = pusher.wait().await; + result_sender.send(Ok(result)).await?; return Ok(()); } } } + /// Waits for all workers to terminate, returning all results. + pub async fn wait(mut self) -> Result>> { + self.flush()?; + + // The worker might have died + let _ = self.sender.send(SessionQueueCommand::Terminate).await; + + self.result_receiver + .recv() + .await + .expect("Nothing in result channel") + } + /// Queues multiple store paths to be pushed. pub fn queue_many(&self, store_paths: Vec) -> Result<()> { self.sender - .send_blocking(store_paths) + .send_blocking(SessionQueueCommand::Paths(store_paths)) + .map_err(|e| anyhow!(e)) + } + + /// Flushes the worker queue. + pub fn flush(&self) -> Result<()> { + self.sender + .send_blocking(SessionQueueCommand::Flush) .map_err(|e| anyhow!(e)) } }