Skip to content

Commit

Permalink
refactor: use client from config
Browse files Browse the repository at this point in the history
  • Loading branch information
Threated committed Nov 8, 2024
1 parent 1802211 commit 1b61d8e
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 22 deletions.
36 changes: 17 additions & 19 deletions src/logic_reply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,46 +4,44 @@ use beam_lib::{AppOrProxyId, TaskRequest, TaskResult, WorkStatus};
use hyper::{header, StatusCode, Uri, Method, http::uri::PathAndQuery};
use tracing::{debug, field, info, trace, warn, Instrument, Span};
use serde_json::Value;
use reqwest::{Client, Response};
use reqwest::Response;

use crate::{config::Config, errors::BeamConnectError, msg::{HttpResponse, HttpRequest}};

pub(crate) async fn process_requests(config: &'static Config, client: Client) -> Result<(), BeamConnectError> {
pub(crate) async fn process_requests(config: &'static Config) -> Result<(), BeamConnectError> {
// Fetch tasks from Proxy
let task = fetch_task(&config, &client).await?;
claim_or_answer(task, &config, client).await?;
let task = fetch_task(&config).await?;
claim_or_answer(task, &config).await?;

Ok(())
}

#[tracing::instrument(skip_all, fields(from = %task.from.hide_broker(), method = %task.body.method, orig_url = %task.body.url, dst_url))]
async fn claim_or_answer(task: TaskRequest<HttpRequest>, config: &'static Config, client: Client) -> Result<(), BeamConnectError> {
async fn claim_or_answer(task: TaskRequest<HttpRequest>, config: &'static Config) -> Result<(), BeamConnectError> {
let task = Arc::new(task);
let task2 = Arc::clone(&task);
let client2 = client.clone();
let mut execute_task = Box::pin(async move {
execute_http_task(&task2, &config, client2).await
execute_http_task(&task2, &config).await
});
let mut claim_task = pin!(claim_task(&task, &config, &client));
let mut claim_task = pin!(claim_task(&task, &config));
tokio::select! {
claimed = &mut claim_task => {
claimed?;
let task = Arc::clone(&task);
let client = client.clone();
tokio::spawn(async move {
if let Err(e) = send_reply(&task, &config, &client, execute_task.await).await {
if let Err(e) = send_reply(&task, &config, execute_task.await).await {
warn!("Failed to send execution result: {e}");
}
}.instrument(Span::current()));
Ok(())
},
resp = &mut execute_task => {
send_reply(&task, &config, &client, resp).await
send_reply(&task, &config, resp).await
}
}
}

async fn claim_task<T>(task: &TaskRequest<T>, config: &Config, client: &Client) -> Result<(), BeamConnectError> {
async fn claim_task<T>(task: &TaskRequest<T>, config: &Config) -> Result<(), BeamConnectError> {
let msg = TaskResult {
from: config.my_app_id.clone().into(),
to: vec![task.from.clone()],
Expand All @@ -53,7 +51,7 @@ async fn claim_task<T>(task: &TaskRequest<T>, config: &Config, client: &Client)
body: (),
};
debug!("Claiming: {msg:?}");
let resp = client
let resp = config.client
.put(format!("{}v1/tasks/{}/results/{}", config.proxy_url, task.id, config.my_app_id.clone()))
.header(header::AUTHORIZATION, config.proxy_auth.clone())
.json(&msg)
Expand All @@ -68,7 +66,7 @@ async fn claim_task<T>(task: &TaskRequest<T>, config: &Config, client: &Client)
}
}

async fn send_reply(task: &TaskRequest<HttpRequest>, config: &Config, client: &Client, resp: Result<Response, BeamConnectError>) -> Result<(), BeamConnectError> {
async fn send_reply(task: &TaskRequest<HttpRequest>, config: &Config, resp: Result<Response, BeamConnectError>) -> Result<(), BeamConnectError> {
let (reply_body, status) = match resp {
Ok(resp) => {
let status = resp.status();
Expand Down Expand Up @@ -103,7 +101,7 @@ async fn send_reply(task: &TaskRequest<HttpRequest>, config: &Config, client: &C
body: reply_body,
};
debug!("Delivering response to Proxy: {msg:?}");
let resp = client
let resp = config.client
.put(format!("{}v1/tasks/{}/results/{}", config.proxy_url, task.id, config.my_app_id.clone()))
.header(header::AUTHORIZATION, config.proxy_auth.clone())
.json(&msg)
Expand All @@ -119,7 +117,7 @@ async fn send_reply(task: &TaskRequest<HttpRequest>, config: &Config, client: &C
}

// TODO: Take ownership of `task` to save clones
async fn execute_http_task(task: &TaskRequest<HttpRequest>, config: &Config, client: Client) -> Result<Response, BeamConnectError> {
async fn execute_http_task(task: &TaskRequest<HttpRequest>, config: &Config) -> Result<Response, BeamConnectError> {
let task_req = &task.body;
let span = Span::current();
span.record("method", field::display(&task_req.method));
Expand Down Expand Up @@ -167,7 +165,7 @@ async fn execute_http_task(task: &TaskRequest<HttpRequest>, config: &Config, cli
headers.remove(header::HOST);
}
info!("Executing");
let resp = client
let resp = config.client
.request(task_req.method.clone(), uri.to_string())
.headers(headers)
.body(task_req.body.to_vec())
Expand All @@ -177,9 +175,9 @@ async fn execute_http_task(task: &TaskRequest<HttpRequest>, config: &Config, cli
Ok(resp)
}

async fn fetch_task(config: &Config, client: &Client) -> Result<TaskRequest<HttpRequest>, BeamConnectError> {
async fn fetch_task(config: &Config) -> Result<TaskRequest<HttpRequest>, BeamConnectError> {
debug!("fetching requests from proxy");
let resp = client
let resp = config.client
.get(format!("{}v1/tasks?to={}&wait_count=1&filter=todo", config.proxy_url, config.my_app_id))
.header(header::AUTHORIZATION, config.proxy_auth.clone())
.header(header::ACCEPT, "application/json")
Expand Down
4 changes: 1 addition & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ async fn main() -> anyhow::Result<()> {
banner::print_banner();
let config = Config::load().await?;
let config: &'static _ = Box::leak(Box::new(config));
let client = config.client.clone();
let client2 = client.clone();
banner::print_startup_app_config(&config);

info!("Global site discovery: {:?}", config.targets_public);
Expand All @@ -42,7 +40,7 @@ async fn main() -> anyhow::Result<()> {
let mut timer= std::pin::pin!(tokio::time::sleep(Duration::from_secs(60)));
loop {
debug!("Waiting for next request ...");
if let Err(e) = logic_reply::process_requests(config, client2.clone()).await {
if let Err(e) = logic_reply::process_requests(config).await {
match e {
BeamConnectError::ProxyTimeoutError => {
debug!("{e}");
Expand Down

0 comments on commit 1b61d8e

Please sign in to comment.