diff --git a/bento/crates/api/src/lib.rs b/bento/crates/api/src/lib.rs index 0f047dfaa..0ac033e9b 100644 --- a/bento/crates/api/src/lib.rs +++ b/bento/crates/api/src/lib.rs @@ -10,7 +10,7 @@ use axum::{ extract::{FromRequestParts, Host, Path, State}, http::{StatusCode, request::Parts}, response::{IntoResponse, Response}, - routing::{get, post, put}, + routing::{delete, get, post, put}, }; use bonsai_sdk::responses::{ CreateSessRes, ImgUploadRes, ProofReq, ReceiptDownload, SessionStats, SessionStatusRes, @@ -118,6 +118,9 @@ pub enum AppError { #[error("Database error")] DbError(#[from] TaskDbErr), + #[error("The input does not exist: {0}")] + InputMissing(String), + #[error("internal error")] InternalErr(AnyhowErr), } @@ -133,6 +136,7 @@ impl AppError { Self::ReceiptMissing(_) => "ReceiptMissing", Self::JournalMissing(_) => "JournalMissing", Self::DbError(_) => "DbError", + Self::InputMissing(_) => "InputMissing", Self::InternalErr(_) => "InternalErr", } .into() @@ -152,7 +156,9 @@ impl IntoResponse for AppError { Self::ImgAlreadyExists(_) | Self::InputAlreadyExists(_) | Self::ReceiptAlreadyExists(_) => StatusCode::NO_CONTENT, - Self::ReceiptMissing(_) | Self::JournalMissing(_) => StatusCode::NOT_FOUND, + Self::ReceiptMissing(_) | Self::JournalMissing(_) | Self::InputMissing(_) => { + StatusCode::NOT_FOUND + } Self::InternalErr(_) | Self::DbError(_) => StatusCode::INTERNAL_SERVER_ERROR, }; @@ -335,6 +341,30 @@ async fn input_upload( })) } +const INPUT_DELETE_PATH: &str = "/inputs/:input_id"; +async fn input_delete( + State(state): State>, + Path(input_id): Path, +) -> Result<(), AppError> { + let input_key = format!("{INPUT_BUCKET_DIR}/{input_id}"); + if !state + .s3_client + .object_exists(&input_key) + .await + .context("Failed to check if object exists")? + { + return Err(AppError::InputMissing(input_id.to_string())); + } + + state + .s3_client + .object_delete(&input_key) + .await + .context("Failed to delete input from object store")?; + + Ok(()) +} + const INPUT_UPLOAD_PUT_PATH: &str = "/inputs/upload/:input_id"; async fn input_upload_put( State(state): State>, @@ -839,6 +869,7 @@ pub fn app(state: Arc) -> Router { .route(IMAGE_UPLOAD_PATH, get(image_upload)) .route(IMAGE_UPLOAD_PATH, put(image_upload_put)) .route(INPUT_UPLOAD_PATH, get(input_upload)) + .route(INPUT_DELETE_PATH, delete(input_delete)) .route(INPUT_UPLOAD_PUT_PATH, put(input_upload_put)) .route(RECEIPT_UPLOAD_PATH, get(receipt_upload)) .route(RECEIPT_UPLOAD_PUT_PATH, put(receipt_upload_put)) diff --git a/bento/crates/workflow-common/src/s3.rs b/bento/crates/workflow-common/src/s3.rs index 91d3de0ae..3b11ba45d 100644 --- a/bento/crates/workflow-common/src/s3.rs +++ b/bento/crates/workflow-common/src/s3.rs @@ -174,6 +174,11 @@ impl S3Client { } } + pub async fn object_delete(&self, key: &str) -> Result<()> { + self.client.delete_object().bucket(&self.bucket).key(key).send().await?; + Ok(()) + } + /// List objects in the bucket with optional prefix pub async fn list_objects(&self, prefix: Option<&str>) -> Result> { let mut objects = Vec::new(); diff --git a/crates/broker/src/aggregator.rs b/crates/broker/src/aggregator.rs index 02f90c808..1c60f20ea 100644 --- a/crates/broker/src/aggregator.rs +++ b/crates/broker/src/aggregator.rs @@ -515,8 +515,10 @@ impl AggregatorService { order.order_id ); - if let Err(err) = - self.db.set_order_failure(&order.order_id, "Expired before aggregation").await + if let Err(err) = self + .db + .set_order_failure(&order.order_id, "Expired before aggregation", &self.prover) + .await { tracing::error!( "Failed to set order {} as failed before aggregation: {err}", diff --git a/crates/broker/src/db/fuzz_db.rs b/crates/broker/src/db/fuzz_db.rs index 1c0f1ae2d..a0a9cc56d 100644 --- a/crates/broker/src/db/fuzz_db.rs +++ b/crates/broker/src/db/fuzz_db.rs @@ -30,7 +30,7 @@ use tokio::runtime::Builder; use crate::FulfillmentType; use crate::{db::AggregationOrder, AggregationState, Order, OrderStatus}; -use super::{BrokerDb, SqliteDb}; +use super::{tests::db_test_prover, BrokerDb, SqliteDb}; use boundless_market::contracts::{ Offer, Predicate, ProofRequest, RequestId, RequestInput, RequestInputType, Requirements, @@ -153,6 +153,7 @@ proptest! { let db: Arc = Arc::new( SqliteDb::new(&db_path).await.unwrap() ); + let prover = db_test_prover(); // Create state tracking structure let state = TestState { @@ -165,6 +166,7 @@ proptest! { for ops in operations.chunks(12) { let db = db.clone(); + let prover = prover.clone(); let ops = ops.to_vec(); let state = TestState { added_orders: state.added_orders.clone(), @@ -196,10 +198,10 @@ proptest! { db.get_order(id).await.unwrap(); }, ExistingOrderOperation::SetOrderComplete => { - db.set_order_complete(id).await.unwrap(); + db.set_order_complete(id, &prover).await.unwrap(); }, ExistingOrderOperation::SetOrderFailure => { - db.set_order_failure(id, "test").await.unwrap(); + db.set_order_failure(id, "test", &prover).await.unwrap(); }, ExistingOrderOperation::SetOrderProofId { proof_id } => { db.set_order_proof_id(id, &proof_id).await.unwrap(); diff --git a/crates/broker/src/db/mod.rs b/crates/broker/src/db/mod.rs index ef6a8a25a..01e5ef779 100644 --- a/crates/broker/src/db/mod.rs +++ b/crates/broker/src/db/mod.rs @@ -25,6 +25,7 @@ use thiserror::Error; use crate::{ errors::{impl_coded_debug, CodedError}, + provers::ProverObj, AggregationState, Batch, BatchStatus, FulfillmentType, Order, OrderRequest, OrderStatus, ProofRequest, }; @@ -124,7 +125,11 @@ pub struct AggregationOrder { #[async_trait] pub trait BrokerDb { - async fn insert_skipped_request(&self, order_request: &OrderRequest) -> Result<(), DbError>; + async fn insert_skipped_request( + &self, + order_request: &OrderRequest, + prover: &ProverObj, + ) -> Result<(), DbError>; async fn insert_accepted_request( &self, order_request: &OrderRequest, @@ -137,8 +142,13 @@ pub trait BrokerDb { id: &str, ) -> Result<(ProofRequest, Bytes, String, String, U256, FulfillmentType), DbError>; async fn get_order_compressed_proof_id(&self, id: &str) -> Result; - async fn set_order_failure(&self, id: &str, failure_str: &'static str) -> Result<(), DbError>; - async fn set_order_complete(&self, id: &str) -> Result<(), DbError>; + async fn set_order_failure( + &self, + id: &str, + failure_str: &'static str, + prover: &ProverObj, + ) -> Result<(), DbError>; + async fn set_order_complete(&self, id: &str, prover: &ProverObj) -> Result<(), DbError>; /// Get all orders that are committed to be prove and be fulfilled. async fn get_committed_orders(&self) -> Result, DbError>; /// Get all orders that are committed to be proved but have expired based on their expire_timestamp. @@ -200,6 +210,24 @@ pub trait BrokerDb { async fn set_batch_status(&self, batch_id: usize, status: BatchStatus) -> Result<(), DbError>; } +async fn delete_input_with_context( + prover: &ProverObj, + input_id: &str, + order_id: &str, + context: &str, +) { + match prover.delete_input(input_id).await { + Ok(_) => tracing::info!("Deleted input {} for {} order {}", input_id, context, order_id), + Err(e) => tracing::warn!( + "Failed to delete input {} for {} order {}: {}", + input_id, + context, + order_id, + e + ), + } +} + pub type DbObj = Arc; pub struct SqliteDb { @@ -317,8 +345,16 @@ impl BrokerDb for SqliteDb { } #[instrument(level = "trace", skip_all, fields(id = %format!("{}", order_request.id())))] - async fn insert_skipped_request(&self, order_request: &OrderRequest) -> Result<(), DbError> { - self.insert_order_ignore_duplicates(&order_request.to_skipped_order()).await + async fn insert_skipped_request( + &self, + order_request: &OrderRequest, + prover: &ProverObj, + ) -> Result<(), DbError> { + self.insert_order_ignore_duplicates(&order_request.to_skipped_order()).await?; + if let Some(input_id) = &order_request.input_id { + delete_input_with_context(prover, input_id, &order_request.id(), "skipped").await; + } + Ok(()) } #[instrument(level = "trace", skip_all, fields(id = %format!("{}", order_request.id())))] @@ -388,16 +424,28 @@ impl BrokerDb for SqliteDb { } #[instrument(level = "trace", skip_all, fields(id = %format!("{id}")))] - async fn set_order_failure(&self, id: &str, failure_str: &'static str) -> Result<(), DbError> { + async fn set_order_failure( + &self, + id: &str, + failure_str: &'static str, + prover: &ProverObj, + ) -> Result<(), DbError> { + let input_id = self + .get_order(id) + .await? + .ok_or_else(|| DbError::OrderNotFound(id.to_string()))? + .input_id; let res = sqlx::query( r#" UPDATE orders - SET data = json_set( + SET data = json_remove( + json_set( json_set( json_set(data, '$.status', $1), '$.updated_at', $2), - '$.error_msg', $3) + '$.error_msg', $3), + '$.input_id') WHERE id = $4"#, ) @@ -412,18 +460,29 @@ impl BrokerDb for SqliteDb { return Err(DbError::OrderNotFound(id.to_string())); } + if let Some(input_id) = input_id { + delete_input_with_context(prover, &input_id, id, "failed").await; + } + Ok(()) } #[instrument(level = "trace", skip_all, fields(id = %format!("{id}")))] - async fn set_order_complete(&self, id: &str) -> Result<(), DbError> { + async fn set_order_complete(&self, id: &str, prover: &ProverObj) -> Result<(), DbError> { + let input_id = self + .get_order(id) + .await? + .ok_or_else(|| DbError::OrderNotFound(id.to_string()))? + .input_id; let res = sqlx::query( r#" UPDATE orders - SET data = json_set( + SET data = json_remove( + json_set( json_set(data, '$.status', $1), - '$.updated_at', $2) + '$.updated_at', $2), + '$.input_id') WHERE id = $3"#, ) @@ -437,6 +496,10 @@ impl BrokerDb for SqliteDb { return Err(DbError::OrderNotFound(id.to_string())); } + if let Some(input_id) = input_id { + delete_input_with_context(prover, &input_id, id, "completed").await; + } + Ok(()) } @@ -1057,15 +1120,20 @@ impl BrokerDb for SqliteDb { } #[cfg(test)] -mod tests { +pub(crate) mod tests { use super::*; - use crate::ProofRequest; + use crate::{ + provers::{ProofResult, Prover, ProverError}, + ProofRequest, + }; use alloy::primitives::{Address, Bytes, U256}; + use async_trait::async_trait; use boundless_market::contracts::{ Offer, Predicate, RequestId, RequestInput, RequestInputType, Requirements, }; use risc0_aggregation::GuestState; - use risc0_zkvm::sha::Digest; + use risc0_zkvm::{sha::Digest, Receipt}; + use std::sync::Mutex; use tracing_test::traced_test; fn create_order_request() -> OrderRequest { @@ -1098,6 +1166,86 @@ mod tests { create_order_request().to_proving_order(Default::default()) } + #[derive(Default)] + struct TestProver { + deleted: Arc>>, + } + + impl TestProver { + fn new() -> Self { + Self { deleted: Arc::new(Mutex::new(Vec::new())) } + } + fn deleted_ids(&self) -> Arc>> { + self.deleted.clone() + } + } + + #[async_trait] + impl Prover for TestProver { + async fn has_image(&self, _image_id: &str) -> Result { + unimplemented!("not required for tests") + } + async fn upload_input(&self, _input: Vec) -> Result { + unimplemented!("not required for tests") + } + async fn delete_input(&self, input_id: &str) -> Result<(), ProverError> { + self.deleted.lock().unwrap().push(input_id.to_string()); + Ok(()) + } + async fn upload_image(&self, _image_id: &str, _image: Vec) -> Result<(), ProverError> { + unimplemented!("not required for tests") + } + async fn preflight( + &self, + _image_id: &str, + _input_id: &str, + _assumptions: Vec, + _executor_limit: Option, + _order_id: &str, + ) -> Result { + unimplemented!("not required for tests") + } + async fn prove_stark( + &self, + _image_id: &str, + _input_id: &str, + _assumptions: Vec, + ) -> Result { + unimplemented!("not required for tests") + } + async fn wait_for_stark(&self, _proof_id: &str) -> Result { + unimplemented!("not required for tests") + } + async fn cancel_stark(&self, _proof_id: &str) -> Result<(), ProverError> { + unimplemented!("not required for tests") + } + async fn get_receipt(&self, _proof_id: &str) -> Result, ProverError> { + unimplemented!("not required for tests") + } + async fn get_preflight_journal( + &self, + _proof_id: &str, + ) -> Result>, ProverError> { + unimplemented!("not required for tests") + } + async fn get_journal(&self, _proof_id: &str) -> Result>, ProverError> { + unimplemented!("not required for tests") + } + async fn compress(&self, _proof_id: &str) -> Result { + unimplemented!("not required for tests") + } + async fn get_compressed_receipt( + &self, + _proof_id: &str, + ) -> Result>, ProverError> { + unimplemented!("not required for tests") + } + } + + pub(crate) fn db_test_prover() -> ProverObj { + Arc::new(TestProver::new()) + } + #[sqlx::test] async fn add_order(pool: SqlitePool) { let db: DbObj = Arc::new(SqliteDb::from(pool).await.unwrap()); @@ -1165,37 +1313,70 @@ mod tests { #[sqlx::test] async fn set_order_failure(pool: SqlitePool) { let db: DbObj = Arc::new(SqliteDb::from(pool).await.unwrap()); - let order = create_order(); + let mut order = create_order(); + order.input_id = Some("input_failure".into()); db.add_order(&order).await.unwrap(); + let prover = TestProver::new(); + let deleted = prover.deleted_ids(); + let prover: ProverObj = Arc::new(prover); let failure_str = "TEST_FAIL"; - db.set_order_failure(&order.id(), failure_str).await.unwrap(); + db.set_order_failure(&order.id(), failure_str, &prover).await.unwrap(); let db_order = db.get_order(&order.id()).await.unwrap().unwrap(); assert_eq!(db_order.status, OrderStatus::Failed); assert_eq!(db_order.error_msg, Some(failure_str.into())); + assert!(db_order.input_id.is_none()); + assert_eq!(deleted.lock().unwrap().as_slice(), ["input_failure"]); } #[sqlx::test] async fn set_order_complete(pool: SqlitePool) { let db: DbObj = Arc::new(SqliteDb::from(pool).await.unwrap()); - let order = create_order(); + let mut order = create_order(); + order.input_id = Some("input_complete".into()); db.add_order(&order).await.unwrap(); + let prover = TestProver::new(); + let deleted = prover.deleted_ids(); + let prover: ProverObj = Arc::new(prover); - db.set_order_complete(&order.id()).await.unwrap(); + db.set_order_complete(&order.id(), &prover).await.unwrap(); let db_order = db.get_order(&order.id()).await.unwrap().unwrap(); assert_eq!(db_order.status, OrderStatus::Done); + assert!(db_order.input_id.is_none()); + assert_eq!(deleted.lock().unwrap().as_slice(), ["input_complete"]); } #[sqlx::test] async fn skip_order(pool: SqlitePool) { let db: DbObj = Arc::new(SqliteDb::from(pool).await.unwrap()); let order = create_order_request(); + let prover = TestProver::new(); + let deleted = prover.deleted_ids(); + let prover: ProverObj = Arc::new(prover); + + db.insert_skipped_request(&order, &prover).await.unwrap(); + let db_order = db.get_order(&order.id()).await.unwrap().unwrap(); + assert_eq!(db_order.status, OrderStatus::Skipped); + assert!(deleted.lock().unwrap().is_empty()); + } + + #[sqlx::test] + async fn insert_skipped_request_deletes_input(pool: SqlitePool) { + let db: DbObj = Arc::new(SqliteDb::from(pool).await.unwrap()); + let mut order = create_order_request(); + order.input_id = Some("input_skip".into()); + let prover = TestProver::new(); + let deleted = prover.deleted_ids(); + let prover: ProverObj = Arc::new(prover); + + db.insert_skipped_request(&order, &prover).await.unwrap(); - db.insert_skipped_request(&order).await.unwrap(); let db_order = db.get_order(&order.id()).await.unwrap().unwrap(); assert_eq!(db_order.status, OrderStatus::Skipped); + assert!(db_order.input_id.is_none()); + assert_eq!(deleted.lock().unwrap().as_slice(), ["input_skip"]); } #[sqlx::test] @@ -1642,13 +1823,14 @@ mod tests { // Skipped request ignores duplicates let order_request = create_order_request(); - db.insert_skipped_request(&order_request).await.unwrap(); + let prover = db_test_prover(); + db.insert_skipped_request(&order_request, &prover).await.unwrap(); let stored_order = db.get_order(&order_request.id()).await.unwrap().unwrap(); assert_eq!(stored_order.status, OrderStatus::Skipped); // Try to insert the same skipped request again - should be ignored - db.insert_skipped_request(&order_request).await.unwrap(); + db.insert_skipped_request(&order_request, &prover).await.unwrap(); assert!(logs_contain("already exists")); // Accepted request can overwrite skipped order diff --git a/crates/broker/src/lib.rs b/crates/broker/src/lib.rs index 7eb5807f3..40e505d00 100644 --- a/crates/broker/src/lib.rs +++ b/crates/broker/src/lib.rs @@ -286,7 +286,9 @@ impl OrderRequest { } fn to_skipped_order(&self) -> Order { - self.to_order(OrderStatus::Skipped) + let mut order = self.to_order(OrderStatus::Skipped); + order.input_id = None; + order } fn to_proving_order(&self, lock_price: U256) -> Order { @@ -970,6 +972,7 @@ where let order_monitor = Arc::new(order_monitor::OrderMonitor::new( self.db.clone(), + prover.clone(), self.provider.clone(), chain_monitor.clone(), config.clone(), diff --git a/crates/broker/src/order_monitor.rs b/crates/broker/src/order_monitor.rs index a239075f1..cbd87b50b 100644 --- a/crates/broker/src/order_monitor.rs +++ b/crates/broker/src/order_monitor.rs @@ -21,7 +21,7 @@ use crate::{ errors::CodedError, impl_coded_debug, now_timestamp, task::{RetryRes, RetryTask, SupervisorErr}, - utils, FulfillmentType, Order, + utils, FulfillmentType, Order, ProverObj, }; use alloy::{ network::Ethereum, @@ -146,6 +146,7 @@ pub struct RpcRetryConfig { #[derive(Clone)] pub struct OrderMonitor

{ db: DbObj, + prover: ProverObj, chain_monitor: Arc>, block_time: u64, config: ConfigLock, @@ -167,6 +168,7 @@ where #[allow(clippy::too_many_arguments)] pub fn new( db: DbObj, + prover: ProverObj, provider: Arc

, chain_monitor: Arc>, config: ConfigLock, @@ -207,6 +209,7 @@ where } let monitor = Self { db, + prover, chain_monitor, block_time, config, @@ -389,7 +392,7 @@ where /// Helper method to skip an order in the database and invalidate the appropriate cache async fn skip_order(&self, order: &OrderRequest, reason: &str) { - if let Err(e) = self.db.insert_skipped_request(order).await { + if let Err(e) = self.db.insert_skipped_request(order, &self.prover).await { tracing::error!("Failed to skip order ({}): {} - {e:?}", reason, order.id()); } @@ -554,7 +557,9 @@ where ); } } - if let Err(err) = self.db.insert_skipped_request(order).await { + if let Err(err) = + self.db.insert_skipped_request(order, &self.prover).await + { tracing::error!( "Failed to set DB failure state for order: {order_id} - {err:?}" ); @@ -993,7 +998,10 @@ where pub(crate) mod tests { use super::*; use crate::OrderStatus; - use crate::{db::SqliteDb, now_timestamp, FulfillmentType}; + use crate::{ + db::{tests::db_test_prover, SqliteDb}, + now_timestamp, FulfillmentType, + }; use alloy::node_bindings::AnvilInstance; use alloy::primitives::{address, Bytes}; use alloy::{ @@ -1156,8 +1164,10 @@ pub(crate) mod tests { let gas_priority_mode = Arc::new(tokio::sync::RwLock::new(PriorityMode::Medium)); + let prover = db_test_prover(); let monitor = OrderMonitor::new( db.clone(), + prover.clone(), provider.clone(), chain_monitor.clone(), config.clone(), diff --git a/crates/broker/src/order_picker.rs b/crates/broker/src/order_picker.rs index 49a8975a6..d3fac3aed 100644 --- a/crates/broker/src/order_picker.rs +++ b/crates/broker/src/order_picker.rs @@ -275,9 +275,10 @@ where tracing::info!("Order pricing cancelled during pricing for order {order_id}"); // Add the cancelled order to the database as skipped - if let Err(e) = self.db.insert_skipped_request(&order).await { + if let Err(e) = self.db.insert_skipped_request(&order, &self.prover).await { tracing::error!("Failed to add cancelled order to database: {e}"); } + return Ok(false); } }; @@ -337,17 +338,19 @@ where // Add the skipped order to the database self.db - .insert_skipped_request(&order) + .insert_skipped_request(&order, &self.prover) .await .context("Failed to add skipped order to database")?; + Ok(false) } Err(err) => { tracing::warn!("Failed to price order {order_id}: {err}"); self.db - .insert_skipped_request(&order) + .insert_skipped_request(&order, &self.prover) .await .context("Failed to skip failed priced order")?; + Ok(false) } } @@ -650,6 +653,11 @@ where } Err(err) => match err { ProverError::ProvingFailed(ref err_msg) => { + if let Err(e) = prover.delete_input(&input_id).await { + tracing::error!( + "Failed to delete input for skipped order {order_id_clone}: {e:?}" + ); + } if err_msg.contains("Session limit exceeded") || err_msg.contains("Execution stopped intentionally due to session limit") { tracing::debug!( @@ -2892,6 +2900,10 @@ pub(crate) mod tests { self.default_prover.upload_input(input).await } + async fn delete_input(&self, input_id: &str) -> Result<(), ProverError> { + self.default_prover.delete_input(input_id).await + } + async fn preflight( &self, image_id: &str, diff --git a/crates/broker/src/provers/bonsai.rs b/crates/broker/src/provers/bonsai.rs index cd6fbaa23..9592717e6 100644 --- a/crates/broker/src/provers/bonsai.rs +++ b/crates/broker/src/provers/bonsai.rs @@ -302,6 +302,14 @@ impl Prover for Bonsai { .await } + async fn delete_input(&self, input_id: &str) -> Result<(), ProverError> { + self.retry( + || async { Ok(self.client.input_delete(input_id).await.map(|_| ())?) }, + "delete input", + ) + .await + } + async fn preflight( &self, image_id: &str, diff --git a/crates/broker/src/provers/default.rs b/crates/broker/src/provers/default.rs index ee05667a4..23dfef1a4 100644 --- a/crates/broker/src/provers/default.rs +++ b/crates/broker/src/provers/default.rs @@ -146,6 +146,12 @@ impl Prover for DefaultProver { Ok(input_id) } + async fn delete_input(&self, input_id: &str) -> Result<(), ProverError> { + let mut inputs = self.state.inputs.write().await; + inputs.remove(input_id); + Ok(()) + } + async fn upload_image(&self, image_id: &str, image: Vec) -> Result<(), ProverError> { let mut images = self.state.images.write().await; images.insert(image_id.to_string(), image); diff --git a/crates/broker/src/provers/mod.rs b/crates/broker/src/provers/mod.rs index ba9ecf82d..95c538989 100644 --- a/crates/broker/src/provers/mod.rs +++ b/crates/broker/src/provers/mod.rs @@ -136,6 +136,7 @@ pub(crate) async fn verify_groth16_receipt( pub trait Prover { async fn has_image(&self, image_id: &str) -> Result; async fn upload_input(&self, input: Vec) -> Result; + async fn delete_input(&self, input_id: &str) -> Result<(), ProverError>; async fn upload_image(&self, image_id: &str, image: Vec) -> Result<(), ProverError>; async fn preflight( &self, diff --git a/crates/broker/src/proving.rs b/crates/broker/src/proving.rs index d9fb33bcd..2ffc2d19e 100644 --- a/crates/broker/src/proving.rs +++ b/crates/broker/src/proving.rs @@ -359,7 +359,7 @@ impl ProvingService { tracing::error!( "Failed to create stark session for order {order_id}: {proving_err:?}" ); - handle_order_failure(&self.db, &order_id, "Proving session create failed").await; + self.handle_order_failure(&order_id, "Proving session create failed").await; return; } }; @@ -392,7 +392,8 @@ impl ProvingService { tracing::info!( "Order {order_id} proof completed but order not actionable: {reason}" ); - handle_order_failure(&self.db, &order_id, reason).await; + + self.handle_order_failure(&order_id, reason).await; } Err(err) => { tracing::error!( @@ -402,7 +403,7 @@ impl ProvingService { proof_retry_count ); - handle_order_failure(&self.db, &order_id, "Proving failed").await; + self.handle_order_failure(&order_id, "Proving failed").await; } } } @@ -417,7 +418,7 @@ impl ProvingService { if order.proof_id.is_none() { tracing::error!("Order in status Proving missing proof_id: {order_id}"); - handle_order_failure(&self.db, &order_id, "Proving status missing proof_id").await; + self.handle_order_failure(&order_id, "Proving status missing proof_id").await; continue; } @@ -428,6 +429,14 @@ impl ProvingService { Ok(()) } + + async fn handle_order_failure(&self, order_id: &str, failure_reason: &'static str) { + if let Err(inner_err) = + self.db.set_order_failure(order_id, failure_reason, &self.prover).await + { + tracing::error!("Failed to set order {order_id} failure: {inner_err:?}"); + } + } } impl RetryTask for ProvingService { @@ -478,12 +487,6 @@ impl RetryTask for ProvingService { } } -async fn handle_order_failure(db: &DbObj, order_id: &str, failure_reason: &'static str) { - if let Err(inner_err) = db.set_order_failure(order_id, failure_reason).await { - tracing::error!("Failed to set order {order_id} failure: {inner_err:?}"); - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/crates/broker/src/submitter.rs b/crates/broker/src/submitter.rs index 98bf2b810..b5b2eda3f 100644 --- a/crates/broker/src/submitter.rs +++ b/crates/broker/src/submitter.rs @@ -369,7 +369,9 @@ where if let Err(err) = res.await { tracing::error!("Failed to submit {order_id}: {err}"); - if let Err(db_err) = self.db.set_order_failure(order_id, "Failed to submit").await { + if let Err(db_err) = + self.db.set_order_failure(order_id, "Failed to submit", &self.prover).await + { tracing::error!("Failed to set order failure during proof submission: {order_id} {db_err:?}"); } } @@ -470,7 +472,7 @@ where for fulfillment in fulfillments.iter() { let order_id = fulfillment_to_order_id.get(&fulfillment.id).unwrap(); - if let Err(db_err) = self.db.set_order_complete(order_id).await { + if let Err(db_err) = self.db.set_order_complete(order_id, &self.prover).await { tracing::error!( "Failed to set order complete during proof submission: {:x} {db_err:?}", fulfillment.id @@ -517,8 +519,10 @@ where ) -> Result<(), SubmitterErr> { tracing::warn!("All orders in batch {batch_id} are expired ({}). Batch will not be submitted, and all orders will be marked as failed.", &orders.iter().map(|order| format!("{order}")).collect::>().join(", ")); for order in orders.clone() { - if let Err(db_err) = - self.db.set_order_failure(order.id().as_str(), "Failed to submit batch").await + if let Err(db_err) = self + .db + .set_order_failure(order.id().as_str(), "Failed to submit batch", &self.prover) + .await { tracing::error!( "Failed to set order failure during proof submission: {} {db_err:?}", @@ -540,7 +544,8 @@ where ) -> Result<(), SubmitterErr> { tracing::warn!("Failed to submit proofs for batch {batch_id}: {err:?} "); for (fulfillment, order_id) in fulfillments.iter().zip(order_ids.iter()) { - if let Err(db_err) = self.db.set_order_failure(order_id, "Failed to submit batch").await + if let Err(db_err) = + self.db.set_order_failure(order_id, "Failed to submit batch", &self.prover).await { tracing::error!( "Failed to set order failure during proof submission: {:x} {db_err:?}", diff --git a/crates/broker/src/utils.rs b/crates/broker/src/utils.rs index edf440360..48e5bdafe 100644 --- a/crates/broker/src/utils.rs +++ b/crates/broker/src/utils.rs @@ -83,7 +83,7 @@ pub(crate) async fn cancel_proof_and_fail_order( // TODO in the case of a failure to cancel, the estimated capacity will be incorrect. Still // setting the order as failed to avoid infinite loops of cancellations. - if let Err(err) = db.set_order_failure(&order_id, failure_reason).await { + if let Err(err) = db.set_order_failure(&order_id, failure_reason, prover).await { tracing::error!( "Failed to set order {order_id} as failed for reason {failure_reason}: {err}", );