Skip to content

Commit

Permalink
feat(tvix/nar-bridge): init
Browse files Browse the repository at this point in the history
This adds an implementation of nar-bridge in Rust.
Currently, only the GET parts are implemented.

Contrary to the Go variant, this doesn't try to keep a mapping from nar
hashes to root node in memory, it simply encodes the root node itself
(stripped by its basename) into the URL.

This pulls in a more recent version of axum than what we use in
tonic, causing two versions of http and hyper, however dealing with
`Body::from_stream` in axum 0.6 is much more annoying, and
hyperium/tonic#1740 suggests this will be fixed
soon.

Change-Id: Ia4c2dbda7cd3fdbe47a75f3e33544d19eac6e44e
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11898
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: Brian Olsen <me@griff.name>
Tested-by: BuildkiteCI
  • Loading branch information
flokli authored and clbot committed Jul 20, 2024
1 parent de6882a commit 7b00a06
Show file tree
Hide file tree
Showing 8 changed files with 1,475 additions and 113 deletions.
268 changes: 231 additions & 37 deletions Cargo.lock

Large diffs are not rendered by default.

937 changes: 861 additions & 76 deletions Cargo.nix

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ members = [
"eval",
"eval/builtin-macros",
"glue",
"nar-bridge",
"nix-compat",
"serde",
"store",
Expand Down
40 changes: 40 additions & 0 deletions nar-bridge/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
[package]
name = "nar-bridge"
version = "0.1.0"
edition = "2021"

[dependencies]
axum = { version = "0.7.5", features = ["http2"] }
bytes = "1.4.0"
clap = { version = "4.0", features = ["derive", "env"] }
data-encoding = "2.3.3"
itertools = "0.12.0"
prost = "0.12.1"
nix-compat = { path = "../nix-compat", features = ["async"] }
thiserror = "1.0.56"
tokio = { version = "1.32.0" }
tokio-listener = { version = "0.4.2", features = [ "axum07", "clap", "multi-listener", "sd_listen" ] }
tokio-util = { version = "0.7.9", features = ["io", "io-util", "compat"] }
tonic = { version = "0.11.0", features = ["tls", "tls-roots"] }
tvix-castore = { path = "../castore" }
tvix-store = { path = "../store" }
tvix-tracing = { path = "../tracing", features = ["tonic"] }
tracing = "0.1.37"
tracing-subscriber = "0.3.16"
url = "2.4.0"
serde = { version = "1.0.204", features = ["derive"] }

[build-dependencies]
prost-build = "0.12.1"
tonic-build = "0.11.0"

[features]
default = ["otlp"]
otlp = ["tvix-tracing/otlp"]

[dev-dependencies]
hex-literal = "0.4.1"
rstest = "0.19.0"

[lints]
workspace = true
84 changes: 84 additions & 0 deletions nar-bridge/src/bin/nar-bridge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use clap::Parser;
use nar_bridge::AppState;
use tracing::info;

/// Expose the Nix HTTP Binary Cache protocol for a tvix-store.
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
struct Cli {
#[arg(long, env, default_value = "grpc+http://[::1]:8000")]
blob_service_addr: String,

#[arg(long, env, default_value = "grpc+http://[::1]:8000")]
directory_service_addr: String,

#[arg(long, env, default_value = "grpc+http://[::1]:8000")]
path_info_service_addr: String,

/// The priority to announce at the `nix-cache-info` endpoint.
/// A lower number means it's *more preferred.
#[arg(long, env, default_value_t = 39)]
priority: u64,

/// The address to listen on.
#[clap(flatten)]
listen_args: tokio_listener::ListenerAddressLFlag,

#[cfg(feature = "otlp")]
/// Whether to configure OTLP. Set --otlp=false to disable.
#[arg(long, default_missing_value = "true", default_value = "true", num_args(0..=1), require_equals(true), action(clap::ArgAction::Set))]
otlp: bool,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let cli = Cli::parse();

let _tracing_handle = {
#[allow(unused_mut)]
let mut builder = tvix_tracing::TracingBuilder::default();
#[cfg(feature = "otlp")]
{
if cli.otlp {
builder = builder.enable_otlp("tvix.store");
}
}
builder.build()?
};

// initialize stores
let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
tvix_store::utils::construct_services(
cli.blob_service_addr,
cli.directory_service_addr,
cli.path_info_service_addr,
)
.await?;

let state = AppState::new(blob_service, directory_service, path_info_service.into());

let app = nar_bridge::gen_router(cli.priority).with_state(state);

let listen_address = &cli.listen_args.listen_address.unwrap_or_else(|| {
"[::]:8000"
.parse()
.expect("invalid fallback listen address")
});

let listener = tokio_listener::Listener::bind(
listen_address,
&Default::default(),
&cli.listen_args.listener_options,
)
.await?;

info!(listen_address=%listen_address, "starting daemon");

tokio_listener::axum07::serve(
listener,
app.into_make_service_with_connect_info::<tokio_listener::SomeSocketAddrClonable>(),
)
.await?;

Ok(())
}
50 changes: 50 additions & 0 deletions nar-bridge/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use axum::routing::head;
use axum::{routing::get, Router};
use std::sync::Arc;
use tvix_castore::blobservice::BlobService;
use tvix_castore::directoryservice::DirectoryService;
use tvix_store::pathinfoservice::PathInfoService;

mod nar;
mod narinfo;

#[derive(Clone)]
pub struct AppState {
blob_service: Arc<dyn BlobService>,
directory_service: Arc<dyn DirectoryService>,
path_info_service: Arc<dyn PathInfoService>,
}

impl AppState {
pub fn new(
blob_service: Arc<dyn BlobService>,
directory_service: Arc<dyn DirectoryService>,
path_info_service: Arc<dyn PathInfoService>,
) -> Self {
Self {
blob_service,
directory_service,
path_info_service,
}
}
}

pub fn gen_router(priority: u64) -> Router<AppState> {
Router::new()
.route("/", get(root))
.route("/nar/tvix-castore/:root_node_enc", get(nar::get))
.route("/:narinfo_str", get(narinfo::get))
.route("/:narinfo_str", head(narinfo::head))
.route("/nix-cache-info", get(move || nix_cache_info(priority)))
}

async fn root() -> &'static str {
"Hello from nar-bridge"
}

async fn nix_cache_info(priority: u64) -> String {
format!(
"StoreDir: /nix/store\nWantMassQuery: 1\nPriority: {}\n",
priority
)
}
77 changes: 77 additions & 0 deletions nar-bridge/src/nar.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use axum::body::Body;
use axum::extract::Query;
use axum::http::StatusCode;
use axum::response::Response;
use bytes::Bytes;
use data_encoding::BASE64URL_NOPAD;
use serde::Deserialize;
use tokio_util::io::ReaderStream;
use tracing::{instrument, warn};

use crate::AppState;

#[derive(Debug, Deserialize)]
pub(crate) struct GetNARParams {
#[serde(rename = "narsize")]
nar_size: u64,
}

#[instrument(skip(blob_service, directory_service))]
pub async fn get(
axum::extract::Path(root_node_enc): axum::extract::Path<String>,
axum::extract::Query(GetNARParams { nar_size }): Query<GetNARParams>,
axum::extract::State(AppState {
blob_service,
directory_service,
..
}): axum::extract::State<AppState>,
) -> Result<Response, StatusCode> {
use prost::Message;
// b64decode the root node passed *by the user*
let root_node_proto = BASE64URL_NOPAD
.decode(root_node_enc.as_bytes())
.map_err(|e| {
warn!(err=%e, "unable to decode root node b64");
StatusCode::NOT_FOUND
})?;

// check the proto size to be somewhat reasonable before parsing it.
if root_node_proto.len() > 4096 {
warn!("rejected too large root node");
return Err(StatusCode::BAD_REQUEST);
}

// parse the proto
let root_node: tvix_castore::proto::Node = Message::decode(Bytes::from(root_node_enc))
.map_err(|e| {
warn!(err=%e, "unable to decode root node proto");
StatusCode::NOT_FOUND
})?;

// validate it.
let root_node = root_node
.validate()
.map_err(|e| {
warn!(err=%e, "root node validation failed");
StatusCode::BAD_REQUEST
})?
.to_owned();

let (w, r) = tokio::io::duplex(1024 * 8);

// spawn a task rendering the NAR to the client
tokio::spawn(async move {
if let Err(e) =
tvix_store::nar::write_nar(w, &root_node, blob_service, directory_service).await
{
warn!(err=%e, "failed to write out NAR");
}
});

Ok(Response::builder()
.status(StatusCode::OK)
.header("cache-control", "max-age=31536000, immutable")
.header("content-length", nar_size)
.body(Body::from_stream(ReaderStream::new(r)))
.unwrap())
}
131 changes: 131 additions & 0 deletions nar-bridge/src/narinfo.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
use axum::http::StatusCode;
use nix_compat::nixbase32;
use tracing::{instrument, warn, Span};
use tvix_castore::proto::node::Node;

use crate::AppState;

#[instrument(skip(path_info_service))]
pub async fn head(
axum::extract::Path(narinfo_str): axum::extract::Path<String>,
axum::extract::State(AppState {
path_info_service, ..
}): axum::extract::State<AppState>,
) -> Result<&'static str, StatusCode> {
let digest = parse_narinfo_str(&narinfo_str)?;
Span::current().record("path_info.digest", &narinfo_str[0..32]);

if path_info_service
.get(digest)
.await
.map_err(|e| {
warn!(err=%e, "failed to get PathInfo");
StatusCode::INTERNAL_SERVER_ERROR
})?
.is_some()
{
Ok("")
} else {
warn!("PathInfo not found");
Err(StatusCode::NOT_FOUND)
}
}

#[instrument(skip(path_info_service))]
pub async fn get(
axum::extract::Path(narinfo_str): axum::extract::Path<String>,
axum::extract::State(AppState {
path_info_service, ..
}): axum::extract::State<AppState>,
) -> Result<String, StatusCode> {
let digest = parse_narinfo_str(&narinfo_str)?;
Span::current().record("path_info.digest", &narinfo_str[0..32]);

// fetch the PathInfo
let path_info = path_info_service
.get(digest)
.await
.map_err(|e| {
warn!(err=%e, "failed to get PathInfo");
StatusCode::INTERNAL_SERVER_ERROR
})?
.ok_or(StatusCode::NOT_FOUND)?;

let store_path = path_info.validate().map_err(|e| {
warn!(err=%e, "invalid PathInfo");
StatusCode::INTERNAL_SERVER_ERROR
})?;

let mut narinfo = path_info.to_narinfo(store_path).ok_or_else(|| {
warn!(path_info=?path_info, "PathInfo contained no NAR data");
StatusCode::INTERNAL_SERVER_ERROR
})?;

// encode the (unnamed) root node in the NAR url itself.
let root_node = path_info
.node
.as_ref()
.and_then(|n| n.node.as_ref())
.expect("root node must not be none")
.clone()
.rename("".into());

let mut buf = Vec::new();
Node::encode(&root_node, &mut buf);

let url = format!(
"nar/tvix-castore/{}?narsize={}",
data_encoding::BASE64URL_NOPAD.encode(&buf),
narinfo.nar_size,
);

narinfo.url = &url;

Ok(narinfo.to_string())
}

/// Parses a `3mzh8lvgbynm9daj7c82k2sfsfhrsfsy.narinfo` string and returns the
/// nixbase32-decoded digest.
fn parse_narinfo_str(s: &str) -> Result<[u8; 20], StatusCode> {
if !s.is_char_boundary(32) {
warn!("invalid string, no char boundary at 32");
return Err(StatusCode::NOT_FOUND);
}

Ok(match s.split_at(32) {
(hash_str, ".narinfo") => {
// we know this is 32 bytes
let hash_str_fixed: [u8; 32] = hash_str.as_bytes().try_into().unwrap();
nixbase32::decode_fixed(hash_str_fixed).map_err(|e| {
warn!(err=%e, "invalid digest");
StatusCode::NOT_FOUND
})?
}
_ => {
warn!("invalid string");
return Err(StatusCode::NOT_FOUND);
}
})
}

#[cfg(test)]
mod test {
use super::parse_narinfo_str;
use hex_literal::hex;

#[test]
fn success() {
assert_eq!(
hex!("8a12321522fd91efbd60ebb2481af88580f61600"),
parse_narinfo_str("00bgd045z0d4icpbc2yyz4gx48ak44la.narinfo").unwrap()
);
}

#[test]
fn failure() {
assert!(parse_narinfo_str("00bgd045z0d4icpbc2yyz4gx48ak44la").is_err());
assert!(parse_narinfo_str("/00bgd045z0d4icpbc2yyz4gx48ak44la").is_err());
assert!(parse_narinfo_str("000000").is_err());
assert!(parse_narinfo_str("00bgd045z0d4icpbc2yyz4gx48ak44l🦊.narinfo").is_err());
}
}

0 comments on commit 7b00a06

Please sign in to comment.