Skip to content

Commit

Permalink
Merge branch 'main' into refactor-futures
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire authored Apr 29, 2024
2 parents f6945a7 + 2c49ee8 commit 5dfc961
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 56 deletions.
124 changes: 76 additions & 48 deletions iroh-bytes/src/store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1611,65 +1611,93 @@ impl ActorState {
EntryState::Complete {
data_location,
outboard_location,
} => {
match data_location {
DataLocation::Inline(()) => {
// ignore export mode, just copy. For inline data we can not reference anyway.
let data = tables.inline_data.get(temp_tag.hash())?.ok_or_else(|| {
ActorError::Inconsistent("inline data not found".to_owned())
})?;
tracing::trace!("exporting inline data to {}", target.display());
tx.send(std::fs::write(&target, data.value()).map_err(|e| e.into()))
.ok();
}
DataLocation::Owned(size) => {
let path = self.options.path.owned_data_path(temp_tag.hash());
if mode == ExportMode::Copy {
} => match data_location {
DataLocation::Inline(()) => {
// ignore export mode, just copy. For inline data we can not reference anyway.
let data = tables.inline_data.get(temp_tag.hash())?.ok_or_else(|| {
ActorError::Inconsistent("inline data not found".to_owned())
})?;
tracing::trace!("exporting inline data to {}", target.display());
tx.send(std::fs::write(&target, data.value()).map_err(|e| e.into()))
.ok();
}
DataLocation::Owned(size) => {
let path = self.options.path.owned_data_path(temp_tag.hash());
match mode {
ExportMode::Copy => {
// copy in an external thread
self.rt.spawn_blocking(move || {
tx.send(export_file_copy(temp_tag, path, size, target, progress))
.ok();
});
} else {
match std::fs::rename(&path, &target) {
Ok(()) => {
let entry = EntryState::Complete {
data_location: DataLocation::External(vec![target], size),
outboard_location,
};
drop(guard);
tables.blobs.insert(temp_tag.hash(), entry)?;
drop(temp_tag);
tx.send(Ok(())).ok();
}
Err(e) => {
}
ExportMode::TryReference => match std::fs::rename(&path, &target) {
Ok(()) => {
let entry = EntryState::Complete {
data_location: DataLocation::External(vec![target], size),
outboard_location,
};
drop(guard);
tables.blobs.insert(temp_tag.hash(), entry)?;
drop(temp_tag);
tx.send(Ok(())).ok();
}
Err(e) => {
const ERR_CROSS: i32 = 18;
if e.raw_os_error() == Some(ERR_CROSS) {
// Cross device renaming failed, copy instead
match std::fs::copy(&path, &target) {
Ok(_) => {
let entry = EntryState::Complete {
data_location: DataLocation::External(
vec![target],
size,
),
outboard_location,
};

drop(guard);
tables.blobs.insert(temp_tag.hash(), entry)?;
tables
.delete_after_commit
.insert(*temp_tag.hash(), [BaoFilePart::Data]);
drop(temp_tag);

tx.send(Ok(())).ok();
}
Err(e) => {
drop(temp_tag);
tx.send(Err(e.into())).ok();
}
}
} else {
drop(temp_tag);
tx.send(Err(e.into())).ok();
}
}
}
},
}
DataLocation::External(paths, size) => {
let path = paths
.first()
.ok_or_else(|| {
ActorError::Inconsistent("external path missing".to_owned())
})?
.to_owned();
// we can not reference external files, so we just copy them. But this does not have to happen in the actor.
if path == target {
// export to the same path, nothing to do
tx.send(Ok(())).ok();
} else {
// copy in an external thread
self.rt.spawn_blocking(move || {
tx.send(export_file_copy(temp_tag, path, size, target, progress))
.ok();
});
}
}
DataLocation::External(paths, size) => {
let path = paths
.first()
.ok_or_else(|| {
ActorError::Inconsistent("external path missing".to_owned())
})?
.to_owned();
// we can not reference external files, so we just copy them. But this does not have to happen in the actor.
if path == target {
// export to the same path, nothing to do
tx.send(Ok(())).ok();
} else {
// copy in an external thread
self.rt.spawn_blocking(move || {
tx.send(export_file_copy(temp_tag, path, size, target, progress))
.ok();
});
}
}
}
},
EntryState::Partial { .. } => {
return Err(io::Error::new(io::ErrorKind::Unsupported, "partial entry").into());
}
Expand Down Expand Up @@ -2273,7 +2301,7 @@ impl ActorState {
}
}

/// Export a file by copyign out its content to a new location
/// Export a file by copying out its content to a new location
fn export_file_copy(
temp_tag: TempTag,
path: PathBuf,
Expand Down
2 changes: 1 addition & 1 deletion iroh-dns-server/examples/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async fn main() -> Result<()> {
println!("publish to {pkarr_relay} ...");

let pkarr = PkarrRelayClient::new(pkarr_relay);
let node_info = NodeInfo::new(node_id, Some(args.relay_url));
let node_info = NodeInfo::new(node_id, Some(args.relay_url), Default::default());
let signed_packet = node_info.to_pkarr_signed_packet(&secret_key, 30)?;
pkarr.publish(&signed_packet).await?;

Expand Down
2 changes: 1 addition & 1 deletion iroh-dns-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ mod tests {
let node_id = secret_key.public();
let relay_url: Url = "https://relay.example.".parse()?;
let pkarr = PkarrRelayClient::new(pkarr_relay);
let node_info = NodeInfo::new(node_id, Some(relay_url.clone()));
let node_info = NodeInfo::new(node_id, Some(relay_url.clone()), Default::default());
let signed_packet = node_info.to_pkarr_signed_packet(&secret_key, 30)?;

pkarr.publish(&signed_packet).await?;
Expand Down
1 change: 1 addition & 0 deletions iroh-net/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,7 @@ mod test_dns_pkarr {
let node_info = NodeInfo::new(
secret_key.public(),
Some("https://relay.example".parse().unwrap()),
Default::default(),
);
let signed_packet = node_info.to_pkarr_signed_packet(&secret_key, 30)?;
state.upsert(signed_packet)?;
Expand Down
6 changes: 5 additions & 1 deletion iroh-net/src/discovery/pkarr_publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,11 @@ impl PkarrPublisher {
///
/// This is a nonblocking function, the actual update is performed in the background.
pub fn update_addr_info(&self, info: &AddrInfo) {
let info = NodeInfo::new(self.node_id, info.relay_url.clone().map(Into::into));
let info = NodeInfo::new(
self.node_id,
info.relay_url.clone().map(Into::into),
Default::default(),
);
self.watchable.update(Some(info)).ok();
}
}
Expand Down
77 changes: 72 additions & 5 deletions iroh-net/src/dns/node_info.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
//! This module contains functions and structs to lookup node information from DNS
//! and to encode node information in Pkarr signed packets.
use std::{collections::BTreeMap, fmt::Display, hash::Hash, str::FromStr};
use std::{
collections::{BTreeMap, BTreeSet},
fmt::Display,
hash::Hash,
net::SocketAddr,
str::FromStr,
};

use anyhow::{anyhow, ensure, Result};
use hickory_proto::error::ProtoError;
Expand All @@ -21,6 +27,8 @@ pub const IROH_TXT_NAME: &str = "_iroh";
pub enum IrohAttr {
/// `relay`: URL of home relay
Relay,
/// `addr`: Direct address
Addr,
}

/// Lookup node info by domain name
Expand Down Expand Up @@ -69,6 +77,8 @@ pub struct NodeInfo {
/// Home relay server for this node
#[debug("{:?}", self.relay_url.as_ref().map(|s| s.to_string()))]
pub relay_url: Option<Url>,
/// Direct addresses
pub direct_addresses: BTreeSet<SocketAddr>,
}

impl From<TxtAttrs<IrohAttr>> for NodeInfo {
Expand All @@ -87,7 +97,17 @@ impl From<&TxtAttrs<IrohAttr>> for NodeInfo {
.flatten()
.next()
.and_then(|s| Url::parse(s).ok());
Self { node_id, relay_url }
let direct_addresses = attrs
.get(&IrohAttr::Addr)
.into_iter()
.flatten()
.filter_map(|s| SocketAddr::from_str(s).ok())
.collect();
Self {
node_id,
relay_url,
direct_addresses,
}
}
}

Expand All @@ -97,6 +117,9 @@ impl From<&NodeInfo> for TxtAttrs<IrohAttr> {
if let Some(relay_url) = &info.relay_url {
attrs.push((IrohAttr::Relay, relay_url.to_string()));
}
for addr in &info.direct_addresses {
attrs.push((IrohAttr::Addr, addr.to_string()));
}
Self::from_parts(info.node_id, attrs.into_iter())
}
}
Expand All @@ -114,15 +137,23 @@ impl From<NodeInfo> for AddrInfo {
fn from(value: NodeInfo) -> Self {
AddrInfo {
relay_url: value.relay_url.map(|u| u.into()),
direct_addresses: Default::default(),
direct_addresses: value.direct_addresses,
}
}
}

impl NodeInfo {
/// Create a new [`NodeInfo`] from its parts.
pub fn new(node_id: NodeId, relay_url: Option<Url>) -> Self {
Self { node_id, relay_url }
pub fn new(
node_id: NodeId,
relay_url: Option<Url>,
direct_addresses: BTreeSet<SocketAddr>,
) -> Self {
Self {
node_id,
relay_url,
direct_addresses,
}
}

fn to_attrs(&self) -> TxtAttrs<IrohAttr> {
Expand Down Expand Up @@ -360,3 +391,39 @@ fn node_domain(node_id: &NodeId, origin: &str) -> Result<Name> {
let domain = Name::from_str(&domain)?;
Ok(domain)
}

#[cfg(test)]
mod tests {
use iroh_base::key::SecretKey;
use std::str::FromStr;

use super::NodeInfo;

#[test]
fn txt_attr_roundtrip() {
let expected = NodeInfo {
node_id: "vpnk377obfvzlipnsfbqba7ywkkenc4xlpmovt5tsfujoa75zqia"
.parse()
.unwrap(),
relay_url: Some("https://example.com".parse().unwrap()),
direct_addresses: ["127.0.0.1:1234".parse().unwrap()].into_iter().collect(),
};
let attrs = expected.to_attrs();
let actual = NodeInfo::from(&attrs);
assert_eq!(expected, actual);
}

#[test]
fn signed_packet_roundtrip() {
let secret_key =
SecretKey::from_str("vpnk377obfvzlipnsfbqba7ywkkenc4xlpmovt5tsfujoa75zqia").unwrap();
let expected = NodeInfo {
node_id: secret_key.public(),
relay_url: Some("https://example.com".parse().unwrap()),
direct_addresses: ["127.0.0.1:1234".parse().unwrap()].into_iter().collect(),
};
let packet = expected.to_pkarr_signed_packet(&secret_key, 30).unwrap();
let actual = NodeInfo::from_pkarr_signed_packet(&packet).unwrap();
assert_eq!(expected, actual);
}
}

0 comments on commit 5dfc961

Please sign in to comment.