From 5f78fd131ed1cd2254518feb85d9ea165177e49f Mon Sep 17 00:00:00 2001 From: Jack Allnutt Date: Thu, 10 Aug 2023 03:50:01 +0100 Subject: [PATCH 1/2] Add /ingest relay endpoint This adds a new `/ingest` endpoint to allow instances to opt-in to relaying their users' posts to a buzzrelay instance without necessarily subscribing to receive posts from the relay --- src/actor.rs | 6 ++++++ src/main.rs | 51 ++++++++++++++++++++++++++++++++++++++++++++++++++- src/relay.rs | 9 +++++++-- 3 files changed, 63 insertions(+), 3 deletions(-) diff --git a/src/actor.rs b/src/actor.rs index 843e4ee..a41e355 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -8,6 +8,7 @@ use crate::activitypub; pub enum ActorKind { TagRelay(String), InstanceRelay(String), + IngestRelay, } impl ActorKind { @@ -32,6 +33,7 @@ impl Actor { format!("https://{}/tag/{}", self.host, tag), ActorKind::InstanceRelay(instance) => format!("https://{}/instance/{}", self.host, instance), + ActorKind::IngestRelay => format!("https://{}/ingest", self.host), } } @@ -49,6 +51,8 @@ impl Actor { format!("#{}", tag), ActorKind::InstanceRelay(instance) => instance.to_string(), + ActorKind::IngestRelay => + self.host.to_string() }), icon: Some(activitypub::Media { media_type: "Image".to_string(), @@ -67,6 +71,8 @@ impl Actor { format!("tag-{}", tag), ActorKind::InstanceRelay(instance) => format!("instance-{}", instance), + ActorKind::IngestRelay => + String::from(env!("CARGO_PKG_NAME")), }), } } diff --git a/src/main.rs b/src/main.rs index 572952d..75e2233 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,6 +13,7 @@ use sigh::{PrivateKey, PublicKey}; use std::{net::SocketAddr, sync::Arc, time::Duration, collections::HashMap}; use std::{panic, process}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +use tokio::sync::mpsc::{channel, Sender}; mod error; mod config; @@ -34,6 +35,7 @@ struct State { hostname: Arc, priv_key: PrivateKey, pub_key: PublicKey, + ingest_tx: Sender, } @@ -69,6 +71,9 @@ async fn webfinger( let at = resource.find('@'); (actor::ActorKind::InstanceRelay(resource[off..at.unwrap_or(resource.len())].to_string()), at.map_or_else(|| state.hostname.clone(), |at| Arc::new(resource[at + 1..].to_string()))) + } else if resource.starts_with("acct:ingest") { + let at = resource.find('@'); + (actor::ActorKind::IngestRelay, at.map_or_else(|| state.hostname.clone(), |at| Arc::new(resource[at + 1..].to_string()))) } else { track_request("GET", "webfinger", "not_found"); return StatusCode::NOT_FOUND.into_response(); @@ -117,6 +122,18 @@ async fn get_instance_actor( .into_response() } +async fn get_ingest_actor( + axum::extract::State(state): axum::extract::State, +) -> Response { + track_request("GET", "actor", "ingest"); + let target = actor::Actor { + host: state.hostname.clone(), + kind: actor::ActorKind::IngestRelay, + }; + target.as_activitypub(&state.pub_key) + .into_response() +} + async fn post_tag_relay( axum::extract::State(state): axum::extract::State, Path(tag): Path, @@ -141,6 +158,17 @@ async fn post_instance_relay( post_relay(state, endpoint, target).await } +async fn post_ingest_relay( + axum::extract::State(state): axum::extract::State, + endpoint: endpoint::Endpoint<'_> +) -> Response{ + let target = actor::Actor { + host: state.hostname.clone(), + kind: actor::ActorKind::IngestRelay, + }; + post_relay(state, endpoint, target).await +} + async fn post_relay( state: State, endpoint: endpoint::Endpoint<'_>, @@ -242,6 +270,23 @@ async fn post_relay( ).into_response() } } + } else if let actor::ActorKind::IngestRelay = target.kind { + match state.ingest_tx.send(endpoint.payload.to_string()).await { + Ok(()) => { + track_request("POST", "relay", "ingest"); + (StatusCode::ACCEPTED, + [("content-type", "application/activity+json")], + "{}" + ).into_response() + }, + Err(e) => { + tracing::error!("ingest_tx.send: {}", e); + track_request("POST", "relay", "ingest"); + (StatusCode::INTERNAL_SERVER_ERROR, + format!("{}", e) + ).into_response() + } + } } else { track_request("POST", "relay", "unrecognized"); (StatusCode::BAD_REQUEST, "Not a recognized request").into_response() @@ -345,13 +390,16 @@ async fn main() { .unwrap() ); let hostname = Arc::new(config.hostname.clone()); - relay::spawn(client.clone(), hostname.clone(), database.clone(), priv_key.clone(), stream_rx); + let (ingest_tx, ingest_rx) = channel::(1024); + relay::spawn(client.clone(), hostname.clone(), database.clone(), priv_key.clone(), stream_rx, ingest_rx); let app = Router::new() .route("/tag/:tag", get(get_tag_actor).post(post_tag_relay)) .route("/instance/:instance", get(get_instance_actor).post(post_instance_relay)) + .route("/ingest", get(get_ingest_actor).post(post_ingest_relay)) .route("/tag/:tag/outbox", get(outbox)) .route("/instance/:instance/outbox", get(outbox)) + .route("/ingest/outbox", get(outbox)) .route("/.well-known/webfinger", get(webfinger)) .route("/.well-known/nodeinfo", get(nodeinfo)) .route("/metrics", get(|| async move { @@ -363,6 +411,7 @@ async fn main() { hostname, priv_key, pub_key, + ingest_tx }) .merge(SpaRouter::new("/", "static")); diff --git a/src/relay.rs b/src/relay.rs index f453c92..05969e9 100644 --- a/src/relay.rs +++ b/src/relay.rs @@ -6,6 +6,7 @@ use serde_json::json; use sigh::PrivateKey; use tokio::{ sync::mpsc::Receiver, + select, }; use crate::{db::Database, send, actor}; @@ -147,14 +148,18 @@ pub fn spawn( hostname: Arc, database: Database, private_key: PrivateKey, - mut stream_rx: Receiver + mut stream_rx: Receiver, + mut ingest_rx: Receiver, ) { let private_key = Arc::new(private_key); tokio::spawn(async move { let mut workers = HashMap::new(); - while let Some(data) = stream_rx.recv().await { + while let Some(data) = select!{ + data = stream_rx.recv() => data, + data = ingest_rx.recv() => data, + } { let t1 = Instant::now(); let post: Post = match serde_json::from_str(&data) { Ok(post) => post, From d514c224ffde5bfc73f03c117688ac5f70c4799a Mon Sep 17 00:00:00 2001 From: Jack Allnutt Date: Thu, 10 Aug 2023 04:16:34 +0100 Subject: [PATCH 2/2] Fix clippy warning --- src/actor.rs | 26 +++++++++++++------------- src/main.rs | 16 ++++++++-------- src/relay.rs | 22 +++++++++++----------- 3 files changed, 32 insertions(+), 32 deletions(-) diff --git a/src/actor.rs b/src/actor.rs index a41e355..3471963 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -6,9 +6,9 @@ use crate::activitypub; #[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] pub enum ActorKind { - TagRelay(String), - InstanceRelay(String), - IngestRelay, + Tag(String), + Instance(String), + Ingest, } impl ActorKind { @@ -16,7 +16,7 @@ impl ActorKind { let tag = deunicode(tag) .to_lowercase() .replace(char::is_whitespace, ""); - ActorKind::TagRelay(tag) + ActorKind::Tag(tag) } } @@ -29,11 +29,11 @@ pub struct Actor { impl Actor { pub fn uri(&self) -> String { match &self.kind { - ActorKind::TagRelay(tag) => + ActorKind::Tag(tag) => format!("https://{}/tag/{}", self.host, tag), - ActorKind::InstanceRelay(instance) => + ActorKind::Instance(instance) => format!("https://{}/instance/{}", self.host, instance), - ActorKind::IngestRelay => format!("https://{}/ingest", self.host), + ActorKind::Ingest => format!("https://{}/ingest", self.host), } } @@ -47,11 +47,11 @@ impl Actor { actor_type: "Service".to_string(), id: self.uri(), name: Some(match &self.kind { - ActorKind::TagRelay(tag) => + ActorKind::Tag(tag) => format!("#{}", tag), - ActorKind::InstanceRelay(instance) => + ActorKind::Instance(instance) => instance.to_string(), - ActorKind::IngestRelay => + ActorKind::Ingest => self.host.to_string() }), icon: Some(activitypub::Media { @@ -67,11 +67,11 @@ impl Actor { pem: pub_key.to_pem().unwrap(), }, preferred_username: Some(match &self.kind { - ActorKind::TagRelay(tag) => + ActorKind::Tag(tag) => format!("tag-{}", tag), - ActorKind::InstanceRelay(instance) => + ActorKind::Instance(instance) => format!("instance-{}", instance), - ActorKind::IngestRelay => + ActorKind::Ingest => String::from(env!("CARGO_PKG_NAME")), }), } diff --git a/src/main.rs b/src/main.rs index 75e2233..61c01cc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -64,16 +64,16 @@ async fn webfinger( if resource.starts_with("acct:tag-") { let off = "acct:tag-".len(); let at = resource.find('@'); - (actor::ActorKind::TagRelay(resource[off..at.unwrap_or(resource.len())].to_string()), + (actor::ActorKind::Tag(resource[off..at.unwrap_or(resource.len())].to_string()), at.map_or_else(|| state.hostname.clone(), |at| Arc::new(resource[at + 1..].to_string()))) } else if resource.starts_with("acct:instance-") { let off = "acct:instance-".len(); let at = resource.find('@'); - (actor::ActorKind::InstanceRelay(resource[off..at.unwrap_or(resource.len())].to_string()), + (actor::ActorKind::Instance(resource[off..at.unwrap_or(resource.len())].to_string()), at.map_or_else(|| state.hostname.clone(), |at| Arc::new(resource[at + 1..].to_string()))) } else if resource.starts_with("acct:ingest") { let at = resource.find('@'); - (actor::ActorKind::IngestRelay, at.map_or_else(|| state.hostname.clone(), |at| Arc::new(resource[at + 1..].to_string()))) + (actor::ActorKind::Ingest, at.map_or_else(|| state.hostname.clone(), |at| Arc::new(resource[at + 1..].to_string()))) } else { track_request("GET", "webfinger", "not_found"); return StatusCode::NOT_FOUND.into_response(); @@ -116,7 +116,7 @@ async fn get_instance_actor( track_request("GET", "actor", "instance"); let target = actor::Actor { host: state.hostname.clone(), - kind: actor::ActorKind::InstanceRelay(instance.to_lowercase()), + kind: actor::ActorKind::Instance(instance.to_lowercase()), }; target.as_activitypub(&state.pub_key) .into_response() @@ -128,7 +128,7 @@ async fn get_ingest_actor( track_request("GET", "actor", "ingest"); let target = actor::Actor { host: state.hostname.clone(), - kind: actor::ActorKind::IngestRelay, + kind: actor::ActorKind::Ingest, }; target.as_activitypub(&state.pub_key) .into_response() @@ -153,7 +153,7 @@ async fn post_instance_relay( ) -> Response { let target = actor::Actor { host: state.hostname.clone(), - kind: actor::ActorKind::InstanceRelay(instance.to_lowercase()), + kind: actor::ActorKind::Instance(instance.to_lowercase()), }; post_relay(state, endpoint, target).await } @@ -164,7 +164,7 @@ async fn post_ingest_relay( ) -> Response{ let target = actor::Actor { host: state.hostname.clone(), - kind: actor::ActorKind::IngestRelay, + kind: actor::ActorKind::Ingest, }; post_relay(state, endpoint, target).await } @@ -270,7 +270,7 @@ async fn post_relay( ).into_response() } } - } else if let actor::ActorKind::IngestRelay = target.kind { + } else if let actor::ActorKind::Ingest = target.kind { match state.ingest_tx.send(endpoint.payload.to_string()).await { Ok(()) => { track_request("POST", "relay", "ingest"); diff --git a/src/relay.rs b/src/relay.rs index 05969e9..4bf5bc5 100644 --- a/src/relay.rs +++ b/src/relay.rs @@ -40,7 +40,7 @@ impl Post<'_> { fn relay_target_kinds(&self) -> impl Iterator { self.host() .into_iter() - .map(actor::ActorKind::InstanceRelay) + .map(actor::ActorKind::Instance) .chain( self.tags() .into_iter() @@ -259,8 +259,8 @@ mod test { }]), }; let mut kinds = post.relay_target_kinds(); - assert_eq!(kinds.next(), Some(ActorKind::InstanceRelay("example.com".to_string()))); - assert_eq!(kinds.next(), Some(ActorKind::TagRelay("foo".to_string()))); + assert_eq!(kinds.next(), Some(ActorKind::Instance("example.com".to_string()))); + assert_eq!(kinds.next(), Some(ActorKind::Tag("foo".to_string()))); assert_eq!(kinds.next(), None); } @@ -274,7 +274,7 @@ mod test { }]), }; let mut kinds = post.relay_target_kinds(); - assert_eq!(kinds.next(), Some(ActorKind::InstanceRelay("example.com".to_string()))); + assert_eq!(kinds.next(), Some(ActorKind::Instance("example.com".to_string()))); assert_eq!(kinds.next(), None); } @@ -288,8 +288,8 @@ mod test { }]), }; let mut kinds = post.relay_target_kinds(); - assert_eq!(kinds.next(), Some(ActorKind::InstanceRelay("example.com".to_string()))); - assert_eq!(kinds.next(), Some(ActorKind::TagRelay("23".to_string()))); + assert_eq!(kinds.next(), Some(ActorKind::Instance("example.com".to_string()))); + assert_eq!(kinds.next(), Some(ActorKind::Tag("23".to_string()))); assert_eq!(kinds.next(), None); } @@ -303,9 +303,9 @@ mod test { }]), }; let mut kinds = post.relay_target_kinds(); - assert_eq!(kinds.next(), Some(ActorKind::InstanceRelay("example.com".to_string()))); - assert_eq!(kinds.next(), Some(ActorKind::TagRelay("dd1302".to_string()))); - assert_eq!(kinds.next(), Some(ActorKind::TagRelay("dd".to_string()))); + assert_eq!(kinds.next(), Some(ActorKind::Instance("example.com".to_string()))); + assert_eq!(kinds.next(), Some(ActorKind::Tag("dd1302".to_string()))); + assert_eq!(kinds.next(), Some(ActorKind::Tag("dd".to_string()))); assert_eq!(kinds.next(), None); } @@ -319,8 +319,8 @@ mod test { }]), }; let mut kinds = post.relay_target_kinds(); - assert_eq!(kinds.next(), Some(ActorKind::InstanceRelay("example.com".to_string()))); - assert_eq!(kinds.next(), Some(ActorKind::TagRelay("sukoteitusiyuhuorudoronguhea".to_string()))); + assert_eq!(kinds.next(), Some(ActorKind::Instance("example.com".to_string()))); + assert_eq!(kinds.next(), Some(ActorKind::Tag("sukoteitusiyuhuorudoronguhea".to_string()))); assert_eq!(kinds.next(), None); } }