Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions bento/crates/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use axum::{
extract::{FromRequestParts, Host, Path, State},
http::{StatusCode, request::Parts},
response::{IntoResponse, Response},
routing::{get, post, put},
routing::{delete, get, post, put},
};
use bonsai_sdk::responses::{
CreateSessRes, ImgUploadRes, ProofReq, ReceiptDownload, SessionStats, SessionStatusRes,
Expand Down Expand Up @@ -118,6 +118,9 @@ pub enum AppError {
#[error("Database error")]
DbError(#[from] TaskDbErr),

#[error("The input does not exist: {0}")]
InputMissing(String),

#[error("internal error")]
InternalErr(AnyhowErr),
}
Expand All @@ -133,6 +136,7 @@ impl AppError {
Self::ReceiptMissing(_) => "ReceiptMissing",
Self::JournalMissing(_) => "JournalMissing",
Self::DbError(_) => "DbError",
Self::InputMissing(_) => "InputMissing",
Self::InternalErr(_) => "InternalErr",
}
.into()
Expand All @@ -152,7 +156,9 @@ impl IntoResponse for AppError {
Self::ImgAlreadyExists(_)
| Self::InputAlreadyExists(_)
| Self::ReceiptAlreadyExists(_) => StatusCode::NO_CONTENT,
Self::ReceiptMissing(_) | Self::JournalMissing(_) => StatusCode::NOT_FOUND,
Self::ReceiptMissing(_) | Self::JournalMissing(_) | Self::InputMissing(_) => {
StatusCode::NOT_FOUND
}
Self::InternalErr(_) | Self::DbError(_) => StatusCode::INTERNAL_SERVER_ERROR,
};

Expand Down Expand Up @@ -335,6 +341,30 @@ async fn input_upload(
}))
}

const INPUT_DELETE_PATH: &str = "/inputs/:input_id";
async fn input_delete(
State(state): State<Arc<AppState>>,
Path(input_id): Path<String>,
) -> Result<(), AppError> {
let input_key = format!("{INPUT_BUCKET_DIR}/{input_id}");
if !state
.s3_client
.object_exists(&input_key)
.await
.context("Failed to check if object exists")?
{
return Err(AppError::InputMissing(input_id.to_string()));
}

state
.s3_client
.object_delete(&input_key)
.await
.context("Failed to delete input from object store")?;

Ok(())
}

const INPUT_UPLOAD_PUT_PATH: &str = "/inputs/upload/:input_id";
async fn input_upload_put(
State(state): State<Arc<AppState>>,
Expand Down Expand Up @@ -839,6 +869,7 @@ pub fn app(state: Arc<AppState>) -> Router {
.route(IMAGE_UPLOAD_PATH, get(image_upload))
.route(IMAGE_UPLOAD_PATH, put(image_upload_put))
.route(INPUT_UPLOAD_PATH, get(input_upload))
.route(INPUT_DELETE_PATH, delete(input_delete))
.route(INPUT_UPLOAD_PUT_PATH, put(input_upload_put))
.route(RECEIPT_UPLOAD_PATH, get(receipt_upload))
.route(RECEIPT_UPLOAD_PUT_PATH, put(receipt_upload_put))
Expand Down
5 changes: 5 additions & 0 deletions bento/crates/workflow-common/src/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ impl S3Client {
}
}

pub async fn object_delete(&self, key: &str) -> Result<()> {
self.client.delete_object().bucket(&self.bucket).key(key).send().await?;
Ok(())
}

/// List objects in the bucket with optional prefix
pub async fn list_objects(&self, prefix: Option<&str>) -> Result<Vec<String>> {
let mut objects = Vec::new();
Expand Down
6 changes: 4 additions & 2 deletions crates/broker/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,8 +515,10 @@ impl AggregatorService {
order.order_id
);

if let Err(err) =
self.db.set_order_failure(&order.order_id, "Expired before aggregation").await
if let Err(err) = self
.db
.set_order_failure(&order.order_id, "Expired before aggregation", &self.prover)
.await
{
tracing::error!(
"Failed to set order {} as failed before aggregation: {err}",
Expand Down
8 changes: 5 additions & 3 deletions crates/broker/src/db/fuzz_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use tokio::runtime::Builder;
use crate::FulfillmentType;
use crate::{db::AggregationOrder, AggregationState, Order, OrderStatus};

use super::{BrokerDb, SqliteDb};
use super::{tests::db_test_prover, BrokerDb, SqliteDb};

use boundless_market::contracts::{
Offer, Predicate, ProofRequest, RequestId, RequestInput, RequestInputType, Requirements,
Expand Down Expand Up @@ -153,6 +153,7 @@ proptest! {
let db: Arc<dyn BrokerDb + Send + Sync> = Arc::new(
SqliteDb::new(&db_path).await.unwrap()
);
let prover = db_test_prover();

// Create state tracking structure
let state = TestState {
Expand All @@ -165,6 +166,7 @@ proptest! {

for ops in operations.chunks(12) {
let db = db.clone();
let prover = prover.clone();
let ops = ops.to_vec();
let state = TestState {
added_orders: state.added_orders.clone(),
Expand Down Expand Up @@ -196,10 +198,10 @@ proptest! {
db.get_order(id).await.unwrap();
},
ExistingOrderOperation::SetOrderComplete => {
db.set_order_complete(id).await.unwrap();
db.set_order_complete(id, &prover).await.unwrap();
},
ExistingOrderOperation::SetOrderFailure => {
db.set_order_failure(id, "test").await.unwrap();
db.set_order_failure(id, "test", &prover).await.unwrap();
},
ExistingOrderOperation::SetOrderProofId { proof_id } => {
db.set_order_proof_id(id, &proof_id).await.unwrap();
Expand Down
Loading
Loading