Skip to content

Commit

Permalink
Merge pull request #176 from jzbor/push-from-stdin
Browse files Browse the repository at this point in the history
attic-client/push: Add flag to read paths from stdin
  • Loading branch information
zhaofengli authored Oct 4, 2024
2 parents c4c7341 + c5764fc commit 61ebdef
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 64 deletions.
142 changes: 102 additions & 40 deletions client/src/command/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ use std::sync::Arc;
use anyhow::{anyhow, Result};
use clap::Parser;
use indicatif::MultiProgress;
use tokio::io::{self, AsyncBufReadExt, BufReader};

use crate::api::ApiClient;
use crate::cache::CacheRef;
use crate::cache::{CacheName, CacheRef, ServerName};
use crate::cli::Opts;
use crate::config::Config;
use crate::push::{PushConfig, Pusher};
use crate::push::{PushConfig, PushSessionConfig, Pusher};
use attic::nix_store::NixStore;

/// Push closures to a binary cache.
Expand All @@ -24,6 +25,10 @@ pub struct Push {
/// The store paths to push.
paths: Vec<PathBuf>,

/// Read paths from the standard input.
#[clap(long)]
stdin: bool,

/// Push the specified paths only and do not compute closures.
#[clap(long)]
no_closure: bool,
Expand All @@ -41,6 +46,79 @@ pub struct Push {
force_preamble: bool,
}

struct PushContext {
store: Arc<NixStore>,
cache_name: CacheName,
server_name: ServerName,
pusher: Pusher,
no_closure: bool,
ignore_upstream_cache_filter: bool,
}

impl PushContext {
async fn push_static(self, paths: Vec<PathBuf>) -> Result<()> {
let roots = paths
.into_iter()
.map(|p| self.store.follow_store_path(p))
.collect::<std::result::Result<Vec<_>, _>>()?;

let plan = self
.pusher
.plan(roots, self.no_closure, self.ignore_upstream_cache_filter)
.await?;

if plan.store_path_map.is_empty() {
if plan.num_all_paths == 0 {
eprintln!("🤷 Nothing selected.");
} else {
eprintln!(
"✅ All done! ({num_already_cached} already cached, {num_upstream} in upstream)",
num_already_cached = plan.num_already_cached,
num_upstream = plan.num_upstream,
);
}

return Ok(());
} else {
eprintln!("⚙️ Pushing {num_missing_paths} paths to \"{cache}\" on \"{server}\" ({num_already_cached} already cached, {num_upstream} in upstream)...",
cache = self.cache_name.as_str(),
server = self.server_name.as_str(),
num_missing_paths = plan.store_path_map.len(),
num_already_cached = plan.num_already_cached,
num_upstream = plan.num_upstream,
);
}

for (_, path_info) in plan.store_path_map {
self.pusher.queue(path_info).await?;
}

let results = self.pusher.wait().await;
results.into_values().collect::<Result<Vec<()>>>()?;

Ok(())
}

async fn push_stdin(self) -> Result<()> {
let session = self.pusher.into_push_session(PushSessionConfig {
no_closure: self.no_closure,
ignore_upstream_cache_filter: self.ignore_upstream_cache_filter,
});

let stdin = BufReader::new(io::stdin());
let mut lines = stdin.lines();
while let Some(line) = lines.next_line().await? {
let path = self.store.follow_store_path(line)?;
session.queue_many(vec![path])?;
}

let results = session.wait().await?;
results.into_values().collect::<Result<Vec<()>>>()?;

Ok(())
}
}

pub async fn run(opts: Opts) -> Result<()> {
let sub = opts.command.as_push().unwrap();
if sub.jobs == 0 {
Expand All @@ -50,19 +128,13 @@ pub async fn run(opts: Opts) -> Result<()> {
let config = Config::load()?;

let store = Arc::new(NixStore::connect()?);
let roots = sub
.paths
.clone()
.into_iter()
.map(|p| store.follow_store_path(p))
.collect::<std::result::Result<Vec<_>, _>>()?;

let (server_name, server, cache) = config.resolve_cache(&sub.cache)?;
let (server_name, server, cache_name) = config.resolve_cache(&sub.cache)?;

let mut api = ApiClient::from_server_config(server.clone())?;

// Confirm remote cache validity, query cache config
let cache_config = api.get_cache_config(cache).await?;
let cache_config = api.get_cache_config(cache_name).await?;

if let Some(api_endpoint) = &cache_config.api_endpoint {
// Use delegated API endpoint
Expand All @@ -76,39 +148,29 @@ pub async fn run(opts: Opts) -> Result<()> {

let mp = MultiProgress::new();

let pusher = Pusher::new(store, api, cache.to_owned(), cache_config, mp, push_config);
let plan = pusher
.plan(roots, sub.no_closure, sub.ignore_upstream_cache_filter)
.await?;

if plan.store_path_map.is_empty() {
if plan.num_all_paths == 0 {
eprintln!("🤷 Nothing selected.");
} else {
eprintln!(
"✅ All done! ({num_already_cached} already cached, {num_upstream} in upstream)",
num_already_cached = plan.num_already_cached,
num_upstream = plan.num_upstream,
);
}
let pusher = Pusher::new(
store.clone(),
api,
cache_name.to_owned(),
cache_config,
mp,
push_config,
);

let push_ctx = PushContext {
store,
cache_name: cache_name.clone(),
server_name: server_name.clone(),
pusher,
no_closure: sub.no_closure,
ignore_upstream_cache_filter: sub.ignore_upstream_cache_filter,
};

return Ok(());
if sub.stdin {
push_ctx.push_stdin().await?;
} else {
eprintln!("⚙️ Pushing {num_missing_paths} paths to \"{cache}\" on \"{server}\" ({num_already_cached} already cached, {num_upstream} in upstream)...",
cache = cache.as_str(),
server = server_name.as_str(),
num_missing_paths = plan.store_path_map.len(),
num_already_cached = plan.num_already_cached,
num_upstream = plan.num_upstream,
);
}

for (_, path_info) in plan.store_path_map {
pusher.queue(path_info).await?;
push_ctx.push_static(sub.paths.clone()).await?;
}

let results = pusher.wait().await;
results.into_values().collect::<Result<Vec<()>>>()?;

Ok(())
}
83 changes: 59 additions & 24 deletions client/src/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use bytes::Bytes;
use futures::future::join_all;
use futures::stream::{Stream, TryStreamExt};
use indicatif::{HumanBytes, MultiProgress, ProgressBar, ProgressState, ProgressStyle};
use tokio::sync::Mutex;
use tokio::sync::{mpsc, Mutex};
use tokio::task::{spawn, JoinHandle};
use tokio::time;

Expand Down Expand Up @@ -100,11 +100,22 @@ pub struct Pusher {
/// seconds since the last path is queued or it's been 10 seconds in total.
pub struct PushSession {
/// Sender to the batching future.
sender: channel::Sender<Vec<StorePath>>,
sender: channel::Sender<SessionQueueCommand>,

/// Receiver of results.
result_receiver: mpsc::Receiver<Result<HashMap<StorePath, Result<()>>>>,
}

enum SessionQueueCommand {
Paths(Vec<StorePath>),
Flush,
Terminate,
}

enum SessionQueuePoll {
Paths(Vec<StorePath>),
Flush,
Terminate,
Closed,
TimedOut,
}
Expand Down Expand Up @@ -255,36 +266,36 @@ impl Pusher {
impl PushSession {
pub fn with_pusher(pusher: Pusher, config: PushSessionConfig) -> Self {
let (sender, receiver) = channel::unbounded();
let (result_sender, result_receiver) = mpsc::channel(1);

let known_paths_mutex = Arc::new(Mutex::new(HashSet::new()));

// FIXME
spawn(async move {
let pusher = Arc::new(pusher);
loop {
if let Err(e) = Self::worker(
pusher.clone(),
config,
known_paths_mutex.clone(),
receiver.clone(),
)
.await
{
eprintln!("Worker exited: {:?}", e);
} else {
break;
}
if let Err(e) = Self::worker(
pusher,
config,
known_paths_mutex.clone(),
receiver.clone(),
result_sender.clone(),
)
.await
{
let _ = result_sender.send(Err(e)).await;
}
});

Self { sender }
Self {
sender,
result_receiver,
}
}

async fn worker(
pusher: Arc<Pusher>,
pusher: Pusher,
config: PushSessionConfig,
known_paths_mutex: Arc<Mutex<HashSet<StorePathHash>>>,
receiver: channel::Receiver<Vec<StorePath>>,
receiver: channel::Receiver<SessionQueueCommand>,
result_sender: mpsc::Sender<Result<HashMap<StorePath, Result<()>>>>,
) -> Result<()> {
let mut roots = HashSet::new();

Expand All @@ -296,7 +307,9 @@ impl PushSession {
loop {
let poll = tokio::select! {
r = receiver.recv() => match r {
Ok(paths) => SessionQueuePoll::Paths(paths),
Ok(SessionQueueCommand::Paths(paths)) => SessionQueuePoll::Paths(paths),
Ok(SessionQueueCommand::Flush) => SessionQueuePoll::Flush,
Ok(SessionQueueCommand::Terminate) => SessionQueuePoll::Terminate,
_ => SessionQueuePoll::Closed,
},
_ = time::sleep(Duration::from_secs(2)) => SessionQueuePoll::TimedOut,
Expand All @@ -306,10 +319,10 @@ impl PushSession {
SessionQueuePoll::Paths(store_paths) => {
roots.extend(store_paths.into_iter());
}
SessionQueuePoll::Closed => {
SessionQueuePoll::Closed | SessionQueuePoll::Terminate => {
break true;
}
SessionQueuePoll::TimedOut => {
SessionQueuePoll::Flush | SessionQueuePoll::TimedOut => {
break false;
}
}
Expand Down Expand Up @@ -352,15 +365,37 @@ impl PushSession {
drop(known_paths);

if done {
let result = pusher.wait().await;
result_sender.send(Ok(result)).await?;
return Ok(());
}
}
}

/// Waits for all workers to terminate, returning all results.
pub async fn wait(mut self) -> Result<HashMap<StorePath, Result<()>>> {
self.flush()?;

// The worker might have died
let _ = self.sender.send(SessionQueueCommand::Terminate).await;

self.result_receiver
.recv()
.await
.expect("Nothing in result channel")
}

/// Queues multiple store paths to be pushed.
pub fn queue_many(&self, store_paths: Vec<StorePath>) -> Result<()> {
self.sender
.send_blocking(store_paths)
.send_blocking(SessionQueueCommand::Paths(store_paths))
.map_err(|e| anyhow!(e))
}

/// Flushes the worker queue.
pub fn flush(&self) -> Result<()> {
self.sender
.send_blocking(SessionQueueCommand::Flush)
.map_err(|e| anyhow!(e))
}
}
Expand Down

0 comments on commit 61ebdef

Please sign in to comment.