Skip to content

Commit

Permalink
feat: Dont poll for incoming files if we dont have a callback
Browse files Browse the repository at this point in the history
  • Loading branch information
Threated committed Feb 20, 2024
1 parent 51265c1 commit 038a6d0
Showing 1 changed file with 13 additions and 9 deletions.
22 changes: 13 additions & 9 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use clap::Parser;
use config::Config;
use futures_util::stream::TryStreamExt;
use once_cell::sync::Lazy;
use reqwest::Client;
use reqwest::{Client, Url};
use serde::{Deserialize, Serialize};
use tokio_util::io::{ReaderStream, StreamReader};

Expand All @@ -30,17 +30,21 @@ pub static CLIENT: Lazy<Client> = Lazy::new(Client::new);

#[tokio::main]
async fn main() {
let app = Router::new().route("/send/:to", post(accept_file));
let app = Router::new().route("/send/:to", post(send_file));
let server = axum::Server::bind(&CONFIG.bind_addr)
.serve(app.into_make_service())
.with_graceful_shutdown(async { tokio::signal::ctrl_c().await.unwrap() });
let (server_res, _) = tokio::join!(server, wait_for_files());
let (server_res, _) = tokio::join!(server, wait_for_files(&CONFIG));
if let Err(e) = server_res {
eprintln!("Server errored: {e}");
}
}

pub async fn wait_for_files() {
pub async fn wait_for_files(config: &Config) {
let Some(ref cb) = config.callback else {
println!("No callback url registered only sending files.");
return;
};
let mut abort = pin!(tokio::signal::ctrl_c());
let block = BlockingOptions::from_count(1);
loop {
Expand All @@ -62,12 +66,12 @@ pub async fn wait_for_files() {
}
};
for task in tasks {
forward_send(task).await;
forward_file(task, cb).await;
}
}
}

pub async fn forward_send(socket_task: SocketTask) {
pub async fn forward_file(socket_task: SocketTask, cb: &Url) {
let FileMetadata { related_headers } = serde_json::from_value(socket_task.metadata).expect("We only ever create this ourselves");
let incoming = match BEAM_CLIENT.connect_socket(&socket_task.id).await {
Ok(v) => v,
Expand All @@ -77,7 +81,7 @@ pub async fn forward_send(socket_task: SocketTask) {
}
};
let res = CLIENT
.post(CONFIG.callback.clone())
.post(cb.clone())
.headers(related_headers)
.body(reqwest::Body::wrap_stream(ReaderStream::new(incoming)))
.send()
Expand All @@ -87,7 +91,7 @@ pub async fn forward_send(socket_task: SocketTask) {
"Got unsuccessful status code from callback server: {}",
r.status()
),
Err(e) => eprintln!("Failed to send file to {}: {e}", CONFIG.callback),
Err(e) => eprintln!("Failed to send file to {cb}: {e}"),
_ => {}
}
}
Expand All @@ -98,7 +102,7 @@ struct FileMetadata {
related_headers: HeaderMap,
}

async fn accept_file(
async fn send_file(
Path(other_proxy_name): Path<String>,
headers: HeaderMap,
body: BodyStream,
Expand Down

0 comments on commit 038a6d0

Please sign in to comment.