Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,953 changes: 918 additions & 1,035 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion reflection-doc/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl Service {
path.as_deref(),
// gio::NetworkManager is slow to initialize the `network-available` property,
// so it might be incorrect therefore always start with no connection.
node::ConnectionMode::None,
node::ConnectionMode::Network,
)
.await?;

Expand Down
15 changes: 8 additions & 7 deletions reflection-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@ authors = [

[dependencies]
thiserror = "2.0.17"
async-trait = "0.1.89"
chrono = "0.4.42"
ciborium = "0.2.2"
p2panda-core = { git = "https://github.com/p2panda/p2panda" }
p2panda-discovery = { git = "https://github.com/p2panda/p2panda", features = ["mdns"] }
p2panda-net = { git = "https://github.com/p2panda/p2panda" }
p2panda-store = { git = "https://github.com/p2panda/p2panda", features = ["sqlite"], default-features = false}
p2panda-stream = { git = "https://github.com/p2panda/p2panda" }
p2panda-sync = { git = "https://github.com/p2panda/p2panda", features = ["log-sync"] }
p2panda-core = { git = "https://github.com/p2panda/p2panda", branch = "net-rewrite" }
p2panda-discovery = { git = "https://github.com/p2panda/p2panda", branch = "net-rewrite", package = "p2panda-discovery-next"}
p2panda-net = { git = "https://github.com/p2panda/p2panda", branch = "net-rewrite", package = "p2panda-net-next"}
p2panda-store = { git = "https://github.com/p2panda/p2panda", features = ["sqlite"], default-features = false, branch = "net-rewrite"}
p2panda-stream = { git = "https://github.com/p2panda/p2panda", branch = "net-rewrite" }
p2panda-sync = { git = "https://github.com/p2panda/p2panda", branch = "net-rewrite", package = "p2panda-sync-next" }
serde = { version = "1.0.228", features = ["derive"] }
serde_bytes = "0.11.19"
sqlx = { version = "0.8.6", features = ["runtime-tokio", "sqlite", "chrono"], default-features = false}
Expand All @@ -27,3 +26,5 @@ tokio-stream = "0.1.17"
tracing = "0.1"
test-log = { version = "0.2.18", default-features = false, features = ["trace", "color"] }
hex = "0.4.3"
rand_chacha = { version = "0.9.0", features = ["os_rng"] }
iroh = "0.95.1"
22 changes: 7 additions & 15 deletions reflection-node/src/author_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,8 @@ use crate::node_inner::NodeInner;
use chrono::Utc;
use p2panda_core::cbor::{DecodeError, decode_cbor, encode_cbor};
use p2panda_core::{PrivateKey, PublicKey};
use p2panda_net::ToNetwork;
use tokio::{
sync::mpsc,
sync::{Mutex, RwLock},
};
use p2panda_net::streams::EphemeralStream;
use tokio::sync::{Mutex, RwLock};
use tracing::error;

const OFFLINE_TIMEOUT: Duration = Duration::from_secs(60);
Expand Down Expand Up @@ -50,7 +47,7 @@ pub struct AuthorTracker<T> {
last_ping: Mutex<HashMap<PublicKey, Instant>>,
document: Arc<T>,
node: Arc<NodeInner>,
tx: RwLock<Option<mpsc::Sender<ToNetwork>>>,
tx: RwLock<Option<EphemeralStream>>,
}

impl<T: SubscribableDocument> AuthorTracker<T> {
Expand All @@ -63,16 +60,15 @@ impl<T: SubscribableDocument> AuthorTracker<T> {
})
}

pub async fn set_document_tx(&self, tx: Option<mpsc::Sender<ToNetwork>>) {
pub async fn set_document_tx(&self, tx: Option<EphemeralStream>) {
let mut tx_guard = self.tx.write().await;
// Send good bye message to the network
if let Some(tx) = tx_guard.as_ref() {
send_message(&self.node.private_key, tx, AuthorMessage::Bye).await;
}

// Set all authors that the tracker has seen to offline, authors the tracker hasn't seen are already offline
let old_authors =
std::mem::replace(self.last_ping.lock().await.deref_mut(), HashMap::new());
let old_authors = std::mem::take(self.last_ping.lock().await.deref_mut());
for author in old_authors.into_keys() {
self.document.author_left(author);
self.set_last_seen(author).await;
Expand Down Expand Up @@ -178,11 +174,7 @@ impl<T: SubscribableDocument> AuthorTracker<T> {
}
}

async fn send_message(
private_key: &PrivateKey,
tx: &mpsc::Sender<ToNetwork>,
message: AuthorMessage,
) {
async fn send_message(private_key: &PrivateKey, tx: &EphemeralStream, message: AuthorMessage) {
// FIXME: We need to add the current time to the message,
// because iroh doesn't broadcast twice the same message message.
let author_message = match encode_cbor(&(&message, SystemTime::now())) {
Expand All @@ -200,7 +192,7 @@ async fn send_message(
return;
}
};
if let Err(error) = tx.send(ToNetwork::Message { bytes }).await {
if let Err(error) = tx.publish(bytes).await {
error!("Failed to sent {message} to the network: {error}");
}
}
59 changes: 10 additions & 49 deletions reflection-node/src/document.rs
Original file line number Diff line number Diff line change
@@ -1,57 +1,16 @@
use std::fmt;
use std::hash::Hash;
use std::sync::Arc;

use crate::node_inner::NodeInner;
use crate::operation::ReflectionExtensions;
use crate::operation_store::CreationError;
use crate::subscription_inner::SubscriptionInner;

use p2panda_core::PublicKey;
use p2panda_net::{ToNetwork, TopicId};
use p2panda_sync::TopicQuery;
use serde::{Deserialize, Serialize};
use p2panda_core::{Operation, PublicKey};
use p2panda_net::{TopicId, streams::StreamError};
use thiserror::Error;
use tokio::{
sync::mpsc,
task::{AbortHandle, JoinError},
};
use tokio::task::{AbortHandle, JoinError};
use tracing::info;

#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub(crate) struct DocumentId(#[serde(with = "serde_bytes")] [u8; 32]);

impl DocumentId {
pub const fn as_slice(&self) -> &[u8] {
self.0.as_slice()
}
}

impl TopicQuery for DocumentId {}

impl TopicId for DocumentId {
fn id(&self) -> [u8; 32] {
self.0
}
}

impl From<[u8; 32]> for DocumentId {
fn from(bytes: [u8; 32]) -> Self {
Self(bytes)
}
}

impl From<DocumentId> for [u8; 32] {
fn from(id: DocumentId) -> Self {
id.0
}
}

impl fmt::Display for DocumentId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&hex::encode(self.0))
}
}

#[derive(Debug, Error)]
pub enum DocumentError {
#[error(transparent)]
Expand All @@ -61,7 +20,9 @@ pub enum DocumentError {
#[error(transparent)]
Encode(#[from] p2panda_core::cbor::EncodeError),
#[error(transparent)]
Send(#[from] mpsc::error::SendError<ToNetwork>),
Publish(#[from] StreamError<Operation<ReflectionExtensions>>),
#[error(transparent)]
PublishEphemeral(#[from] StreamError<Vec<u8>>),
#[error(transparent)]
Runtime(#[from] JoinError),
}
Expand All @@ -85,7 +46,7 @@ impl<T> Drop for Subscription<T> {
}

impl<T: SubscribableDocument + 'static> Subscription<T> {
pub(crate) async fn new(node: Arc<NodeInner>, id: DocumentId, document: Arc<T>) -> Self {
pub(crate) async fn new(node: Arc<NodeInner>, id: TopicId, document: Arc<T>) -> Self {
let inner = SubscriptionInner::new(node, id, document);

let inner_clone = inner.clone();
Expand All @@ -97,7 +58,7 @@ impl<T: SubscribableDocument + 'static> Subscription<T> {
})
.abort_handle();

info!("Subscribed to document {}", id);
info!("Subscribed to document {}", hex::encode(id));

Subscription {
inner,
Expand Down Expand Up @@ -144,7 +105,7 @@ impl<T: SubscribableDocument + 'static> Subscription<T> {
.spawn(async move { inner.unsubscribe().await })
.await??;

info!("Unsubscribed from document {}", document_id);
info!("Unsubscribed from document {}", hex::encode(document_id));

Ok(())
}
Expand Down
Loading
Loading