Skip to content

Commit

Permalink
client/push: Use PushSession for --stdin
Browse files Browse the repository at this point in the history
Instead of eagerly consuming stdin, read line-by-line and feed into
PushSession. This allows for a `nix-build | attic push` workflow.
  • Loading branch information
zhaofengli committed Oct 4, 2024
1 parent 99f3fbd commit c5764fc
Showing 1 changed file with 98 additions and 50 deletions.
148 changes: 98 additions & 50 deletions client/src/command/push.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
use std::io;
use std::io::BufRead;
use std::path::PathBuf;
use std::sync::Arc;

use anyhow::{anyhow, Result};
use clap::Parser;
use indicatif::MultiProgress;
use tokio::io::{self, AsyncBufReadExt, BufReader};

use crate::api::ApiClient;
use crate::cache::CacheRef;
use crate::cache::{CacheName, CacheRef, ServerName};
use crate::cli::Opts;
use crate::config::Config;
use crate::push::{PushConfig, Pusher};
use crate::push::{PushConfig, PushSessionConfig, Pusher};
use attic::nix_store::NixStore;

/// Push closures to a binary cache.
Expand Down Expand Up @@ -47,6 +46,79 @@ pub struct Push {
force_preamble: bool,
}

struct PushContext {
store: Arc<NixStore>,
cache_name: CacheName,
server_name: ServerName,
pusher: Pusher,
no_closure: bool,
ignore_upstream_cache_filter: bool,
}

impl PushContext {
async fn push_static(self, paths: Vec<PathBuf>) -> Result<()> {
let roots = paths
.into_iter()
.map(|p| self.store.follow_store_path(p))
.collect::<std::result::Result<Vec<_>, _>>()?;

let plan = self
.pusher
.plan(roots, self.no_closure, self.ignore_upstream_cache_filter)
.await?;

if plan.store_path_map.is_empty() {
if plan.num_all_paths == 0 {
eprintln!("🤷 Nothing selected.");
} else {
eprintln!(
"✅ All done! ({num_already_cached} already cached, {num_upstream} in upstream)",
num_already_cached = plan.num_already_cached,
num_upstream = plan.num_upstream,
);
}

return Ok(());
} else {
eprintln!("⚙️ Pushing {num_missing_paths} paths to \"{cache}\" on \"{server}\" ({num_already_cached} already cached, {num_upstream} in upstream)...",
cache = self.cache_name.as_str(),
server = self.server_name.as_str(),
num_missing_paths = plan.store_path_map.len(),
num_already_cached = plan.num_already_cached,
num_upstream = plan.num_upstream,
);
}

for (_, path_info) in plan.store_path_map {
self.pusher.queue(path_info).await?;
}

let results = self.pusher.wait().await;
results.into_values().collect::<Result<Vec<()>>>()?;

Ok(())
}

async fn push_stdin(self) -> Result<()> {
let session = self.pusher.into_push_session(PushSessionConfig {
no_closure: self.no_closure,
ignore_upstream_cache_filter: self.ignore_upstream_cache_filter,
});

let stdin = BufReader::new(io::stdin());
let mut lines = stdin.lines();
while let Some(line) = lines.next_line().await? {
let path = self.store.follow_store_path(line)?;
session.queue_many(vec![path])?;
}

let results = session.wait().await?;
results.into_values().collect::<Result<Vec<()>>>()?;

Ok(())
}
}

pub async fn run(opts: Opts) -> Result<()> {
let sub = opts.command.as_push().unwrap();
if sub.jobs == 0 {
Expand All @@ -56,27 +128,13 @@ pub async fn run(opts: Opts) -> Result<()> {
let config = Config::load()?;

let store = Arc::new(NixStore::connect()?);
let roots = if sub.stdin {
io::stdin()
.lock()
.lines()
.flatten()
.map(|p| store.follow_store_path(p.trim()))
.collect::<std::result::Result<Vec<_>, _>>()?
} else {
sub.paths
.clone()
.into_iter()
.map(|p| store.follow_store_path(p))
.collect::<std::result::Result<Vec<_>, _>>()?
};

let (server_name, server, cache) = config.resolve_cache(&sub.cache)?;
let (server_name, server, cache_name) = config.resolve_cache(&sub.cache)?;

let mut api = ApiClient::from_server_config(server.clone())?;

// Confirm remote cache validity, query cache config
let cache_config = api.get_cache_config(cache).await?;
let cache_config = api.get_cache_config(cache_name).await?;

if let Some(api_endpoint) = &cache_config.api_endpoint {
// Use delegated API endpoint
Expand All @@ -90,39 +148,29 @@ pub async fn run(opts: Opts) -> Result<()> {

let mp = MultiProgress::new();

let pusher = Pusher::new(store, api, cache.to_owned(), cache_config, mp, push_config);
let plan = pusher
.plan(roots, sub.no_closure, sub.ignore_upstream_cache_filter)
.await?;

if plan.store_path_map.is_empty() {
if plan.num_all_paths == 0 {
eprintln!("🤷 Nothing selected.");
} else {
eprintln!(
"✅ All done! ({num_already_cached} already cached, {num_upstream} in upstream)",
num_already_cached = plan.num_already_cached,
num_upstream = plan.num_upstream,
);
}
let pusher = Pusher::new(
store.clone(),
api,
cache_name.to_owned(),
cache_config,
mp,
push_config,
);

let push_ctx = PushContext {
store,
cache_name: cache_name.clone(),
server_name: server_name.clone(),
pusher,
no_closure: sub.no_closure,
ignore_upstream_cache_filter: sub.ignore_upstream_cache_filter,
};

return Ok(());
if sub.stdin {
push_ctx.push_stdin().await?;
} else {
eprintln!("⚙️ Pushing {num_missing_paths} paths to \"{cache}\" on \"{server}\" ({num_already_cached} already cached, {num_upstream} in upstream)...",
cache = cache.as_str(),
server = server_name.as_str(),
num_missing_paths = plan.store_path_map.len(),
num_already_cached = plan.num_already_cached,
num_upstream = plan.num_upstream,
);
}

for (_, path_info) in plan.store_path_map {
pusher.queue(path_info).await?;
push_ctx.push_static(sub.paths.clone()).await?;
}

let results = pusher.wait().await;
results.into_values().collect::<Result<Vec<()>>>()?;

Ok(())
}

0 comments on commit c5764fc

Please sign in to comment.