From 263c533f629a55fc190075905c5fc7a865451579 Mon Sep 17 00:00:00 2001 From: Jorge Antonio Date: Tue, 31 Dec 2024 15:12:16 +0000 Subject: [PATCH 1/7] add changes --- atoma-proxy/src/server/handlers/embeddings.rs | 16 ++++------------ .../src/server/handlers/image_generations.rs | 4 ++-- atoma-proxy/src/server/middleware.rs | 2 +- atoma-proxy/src/server/types.rs | 5 ++++- 4 files changed, 11 insertions(+), 16 deletions(-) diff --git a/atoma-proxy/src/server/handlers/embeddings.rs b/atoma-proxy/src/server/handlers/embeddings.rs index ca3518c5..dd025223 100644 --- a/atoma-proxy/src/server/handlers/embeddings.rs +++ b/atoma-proxy/src/server/handlers/embeddings.rs @@ -273,18 +273,10 @@ pub async fn confidential_embeddings_create( // with a "total_tokens" field, which correctly specifies the number of total tokens // processed by the node, as the latter is running within a TEE. let total_tokens = response - .get("total_tokens") - .map(|u| { - u.as_u64().ok_or_else(|| AtomaProxyError::InternalError { - message: "Failed to get total tokens".to_string(), - endpoint: metadata.endpoint.clone(), - }) - }) - .transpose() - .map_err(|e| AtomaProxyError::InternalError { - message: format!("Failed to get total tokens: {}", e), - endpoint: metadata.endpoint.clone(), - })? + .get("usage") + .and_then(|usage| usage.get("total_tokens")) + .and_then(|total_tokens| total_tokens.as_u64()) + .map(|n| n as i64) .unwrap_or(0); update_state_manager( &state.state_manager_sender, diff --git a/atoma-proxy/src/server/handlers/image_generations.rs b/atoma-proxy/src/server/handlers/image_generations.rs index 6a61d4e9..92e18500 100644 --- a/atoma-proxy/src/server/handlers/image_generations.rs +++ b/atoma-proxy/src/server/handlers/image_generations.rs @@ -249,8 +249,8 @@ pub async fn confidential_image_generations_create( .await { Ok(response) => { - // NOTE: At this point, we do not need to update the stack num tokens, - // because the image generation response was correctly generated. + // NOTE: At this point, we do not need to update the stack num compute units, + // because the image generation response was correctly generated by a TEE node. Ok(response.into_response()) } Err(e) => { diff --git a/atoma-proxy/src/server/middleware.rs b/atoma-proxy/src/server/middleware.rs index 50ab60fd..c72c3b3a 100644 --- a/atoma-proxy/src/server/middleware.rs +++ b/atoma-proxy/src/server/middleware.rs @@ -420,7 +420,7 @@ pub async fn confidential_compute_middleware( let num_compute_units = if endpoint == CONFIDENTIAL_IMAGE_GENERATIONS_PATH { confidential_compute_request - .max_tokens + .num_compute_units .unwrap_or(DEFAULT_IMAGE_RESOLUTION) as i64 } else { MAX_NUM_TOKENS_FOR_CONFIDENTIAL_COMPUTE diff --git a/atoma-proxy/src/server/types.rs b/atoma-proxy/src/server/types.rs index c8844fec..79fab1e4 100644 --- a/atoma-proxy/src/server/types.rs +++ b/atoma-proxy/src/server/types.rs @@ -19,6 +19,9 @@ pub struct ConfidentialComputeRequest { /// Client's public key for Diffie-Hellman key exchange (base64 encoded) pub client_dh_public_key: String, + /// Node's public key for Diffie-Hellman key exchange (base64 encoded) + pub node_dh_public_key: String, + /// Hash of the original plaintext body for integrity verification (base64 encoded) pub plaintext_body_hash: String, @@ -30,5 +33,5 @@ pub struct ConfidentialComputeRequest { /// Number of compute units to be used for the request, for image generations, /// as this value is known in advance (the number of pixels to generate) - pub max_tokens: Option, + pub num_compute_units: Option, } From a7444c7c2a3f635542cfa94ae3598925bb2cdcbd Mon Sep 17 00:00:00 2001 From: Jorge Antonio Date: Tue, 31 Dec 2024 17:49:23 +0000 Subject: [PATCH 2/7] add changes --- atoma-proxy/src/server/handlers/mod.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/atoma-proxy/src/server/handlers/mod.rs b/atoma-proxy/src/server/handlers/mod.rs index 0182bd62..4849ccc1 100644 --- a/atoma-proxy/src/server/handlers/mod.rs +++ b/atoma-proxy/src/server/handlers/mod.rs @@ -1,5 +1,6 @@ use atoma_state::types::AtomaAtomaStateManagerEvent; use flume::Sender; +use tracing::instrument; use super::error::AtomaProxyError; use crate::server::Result; @@ -34,6 +35,11 @@ pub mod select_node_public_key; /// This function will return an error if: /// - The state manager channel is closed /// - Either update operation fails to complete +#[instrument( + level = "info", + skip_all, + fields(stack_small_id, estimated_total_tokens, total_tokens, endpoint) +)] pub fn update_state_manager( state_manager_sender: &Sender, stack_small_id: i64, From d4903a88365de4e292e4b299ab1b494170e712fe Mon Sep 17 00:00:00 2001 From: Jorge Antonio Date: Wed, 1 Jan 2025 11:28:18 +0000 Subject: [PATCH 3/7] first commit --- Cargo.lock | 1 + Cargo.toml | 2 +- atoma-auth/src/sui/mod.rs | 9 ++ atoma-proxy/Cargo.toml | 1 + .../src/server/handlers/chat_completions.rs | 12 +- atoma-proxy/src/server/handlers/embeddings.rs | 13 +- .../src/server/handlers/image_generations.rs | 10 +- atoma-proxy/src/server/handlers/mod.rs | 134 ++++++++++++++++++ atoma-proxy/src/server/streamer.rs | 17 ++- 9 files changed, 189 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b6598dbc..2ba98d99 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -604,6 +604,7 @@ dependencies = [ "blake2", "clap", "config", + "fastcrypto 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "flume", "futures", "hf-hub", diff --git a/Cargo.toml b/Cargo.toml index d9bf39da..89735b84 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ chrono = "0.4.38" clap = "4.5.20" config = "0.14.1" dcap-qvl = "0.1.6" +fastcrypto = "0.1.8" flume = "0.11.1" futures = "0.3.31" hf-hub = "0.3.2" @@ -53,5 +54,4 @@ utoipa-swagger-ui = "8.0.3" uuid = "1.11.0" x25519-dalek = "2.0.1" zeroize = "1.8.1" -fastcrypto = "0.1.8" tower-http = "0.6.2" diff --git a/atoma-auth/src/sui/mod.rs b/atoma-auth/src/sui/mod.rs index cb30a605..ae642fe3 100644 --- a/atoma-auth/src/sui/mod.rs +++ b/atoma-auth/src/sui/mod.rs @@ -215,6 +215,15 @@ impl Sui { Ok(signature.encode_base64()) } + /// Get the underlying keystore + /// + /// # Returns + /// + /// Returns the keystore. + pub fn get_keystore(&self) -> &Keystore { + &self.wallet_ctx.config.keystore + } + /// Sign a hash using the wallet's private key /// /// # Arguments diff --git a/atoma-proxy/Cargo.toml b/atoma-proxy/Cargo.toml index 782361e4..c736031a 100644 --- a/atoma-proxy/Cargo.toml +++ b/atoma-proxy/Cargo.toml @@ -17,6 +17,7 @@ base64 = { workspace = true } blake2.workspace = true clap.workspace = true config.workspace = true +fastcrypto.workspace = true flume.workspace = true futures = { workspace = true } hf-hub = { workspace = true } diff --git a/atoma-proxy/src/server/handlers/chat_completions.rs b/atoma-proxy/src/server/handlers/chat_completions.rs index 68d83986..18d395e5 100644 --- a/atoma-proxy/src/server/handlers/chat_completions.rs +++ b/atoma-proxy/src/server/handlers/chat_completions.rs @@ -16,7 +16,7 @@ use tracing::instrument; use utoipa::{OpenApi, ToSchema}; use super::request_model::RequestModel; -use super::update_state_manager; +use super::{update_state_manager, verify_and_sign_response, PROXY_SIGNATURE_KEY}; use crate::server::Result; /// Path for the confidential chat completions endpoint. @@ -464,7 +464,7 @@ async fn handle_non_streaming_response( let client = reqwest::Client::new(); let time = Instant::now(); - let response = client + let mut response = client .post(format!("{}{}", node_address, endpoint)) .headers(headers) .json(&payload) @@ -504,6 +504,12 @@ async fn handle_non_streaming_response( .map(|n| n as i64) .unwrap_or(0); + let guard = state.sui.read().await; + let keystore = guard.get_keystore(); + let proxy_signature = verify_and_sign_response(&response.0, keystore)?; + + response[PROXY_SIGNATURE_KEY] = Value::String(proxy_signature); + state .state_manager_sender .send( @@ -622,12 +628,14 @@ async fn handle_streaming_response( let stream = response.bytes_stream(); + let guard = state.sui.read().await; // Create the SSE stream let stream = Sse::new(Streamer::new( stream, state.state_manager_sender.clone(), selected_stack_small_id, estimated_total_tokens, + guard, start, node_id, model_name, diff --git a/atoma-proxy/src/server/handlers/embeddings.rs b/atoma-proxy/src/server/handlers/embeddings.rs index dd025223..6c271864 100644 --- a/atoma-proxy/src/server/handlers/embeddings.rs +++ b/atoma-proxy/src/server/handlers/embeddings.rs @@ -19,7 +19,10 @@ use crate::server::{ types::ConfidentialComputeRequest, }; -use super::{request_model::RequestModel, update_state_manager}; +use super::{ + request_model::RequestModel, update_state_manager, verify_and_sign_response, + PROXY_SIGNATURE_KEY, +}; use crate::server::Result; /// Path for the confidential embeddings endpoint. @@ -349,7 +352,7 @@ async fn handle_embeddings_response( let client = reqwest::Client::new(); let time = Instant::now(); // Send the request to the AI node - let response = client + let mut response = client .post(format!("{}{}", node_address, endpoint)) .headers(headers) .json(&payload) @@ -366,6 +369,12 @@ async fn handle_embeddings_response( endpoint: endpoint.to_string(), })?; + let guard = state.sui.read().await; + let keystore = guard.get_keystore(); + let proxy_signature = verify_and_sign_response(&response, keystore)?; + + response[PROXY_SIGNATURE_KEY] = Value::String(proxy_signature); + let num_input_compute_units = if endpoint == CONFIDENTIAL_EMBEDDINGS_PATH { response .get("total_tokens") diff --git a/atoma-proxy/src/server/handlers/image_generations.rs b/atoma-proxy/src/server/handlers/image_generations.rs index 92e18500..3f1caa53 100644 --- a/atoma-proxy/src/server/handlers/image_generations.rs +++ b/atoma-proxy/src/server/handlers/image_generations.rs @@ -16,7 +16,7 @@ use crate::server::types::ConfidentialComputeRequest; use crate::server::{http_server::ProxyState, middleware::RequestMetadataExtension}; use super::request_model::RequestModel; -use super::update_state_manager; +use super::{update_state_manager, verify_and_sign_response, PROXY_SIGNATURE_KEY}; use crate::server::Result; /// Path for the confidential image generations endpoint. @@ -318,7 +318,7 @@ async fn handle_image_generation_response( let client = reqwest::Client::new(); let time = Instant::now(); // Send the request to the AI node - let response = client + let mut response = client .post(format!("{}{}", node_address, endpoint)) .headers(headers) .json(&payload) @@ -336,6 +336,12 @@ async fn handle_image_generation_response( }) .map(Json)?; + let guard = state.sui.read().await; + let keystore = guard.get_keystore(); + let proxy_signature = verify_and_sign_response(&response.0, keystore)?; + + response[PROXY_SIGNATURE_KEY] = Value::String(proxy_signature); + // Update the node throughput performance state .state_manager_sender diff --git a/atoma-proxy/src/server/handlers/mod.rs b/atoma-proxy/src/server/handlers/mod.rs index 4849ccc1..a9bfa10b 100644 --- a/atoma-proxy/src/server/handlers/mod.rs +++ b/atoma-proxy/src/server/handlers/mod.rs @@ -1,5 +1,16 @@ +use std::str::FromStr; + use atoma_state::types::AtomaAtomaStateManagerEvent; +use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _}; +use fastcrypto::{ + ed25519::{Ed25519PublicKey, Ed25519Signature}, + secp256k1::{Secp256k1PublicKey, Secp256k1Signature}, + secp256r1::{Secp256r1PublicKey, Secp256r1Signature}, + traits::{ToFromBytes, VerifyingKey}, +}; use flume::Sender; +use sui_keys::keystore::{AccountKeystore, Keystore}; +use sui_sdk::types::crypto::{PublicKey, Signature, SignatureScheme, SuiSignature}; use tracing::instrument; use super::error::AtomaProxyError; @@ -11,6 +22,15 @@ pub mod image_generations; pub mod request_model; pub mod select_node_public_key; +/// Key for the proxy signature in the payload +pub const PROXY_SIGNATURE_KEY: &str = "proxy_signature"; + +/// Key for the response hash in the payload +const RESPONSE_HASH_KEY: &str = "response_hash"; + +/// Key for the signature in the payload +const SIGNATURE_KEY: &str = "signature"; + /// Updates the state manager with token usage and hash information for a stack. /// /// This function performs two main operations: @@ -60,3 +80,117 @@ pub fn update_state_manager( })?; Ok(()) } + +/// Verifies a Sui signature and creates a new signature using the proxy's key +/// +/// # Arguments +/// +/// * `payload` - JSON payload containing the response hash and its signature +/// * `node_public_key` - Public key of the node that signed the response +/// * `proxy_keystore` - Keystore containing the proxy's signing key +/// +/// # Returns +/// +/// Returns `Ok(String)` with the new signature if verification succeeds, +/// or an error if verification fails or signing fails +/// +/// # Errors +/// +/// This function will return an error if: +/// - The payload format is invalid +/// - The signature verification fails +/// - Creating the new signature fails +#[instrument(level = "debug", skip_all)] +pub fn verify_and_sign_response( + payload: &serde_json::Value, + keystore: &Keystore, +) -> Result { + // Extract response hash and signature from payload + let response_hash = + payload[RESPONSE_HASH_KEY] + .as_str() + .ok_or_else(|| AtomaProxyError::InternalError { + message: "Missing response_hash in payload".to_string(), + endpoint: "verify_signature".to_string(), + })?; + + let node_signature = + payload[SIGNATURE_KEY] + .as_str() + .ok_or_else(|| AtomaProxyError::InternalError { + message: "Missing signature in payload".to_string(), + endpoint: "verify_signature".to_string(), + })?; + + let signature = + Signature::from_str(node_signature).map_err(|e| AtomaProxyError::InternalError { + message: format!("Failed to create signature: {}", e), + endpoint: "verify_signature".to_string(), + })?; + + let public_key_bytes = signature.public_key_bytes(); + let public_key = + PublicKey::try_from_bytes(signature.scheme(), public_key_bytes).map_err(|e| { + AtomaProxyError::InternalError { + message: format!("Failed to create public key: {}", e), + endpoint: "verify_signature".to_string(), + } + })?; + + match signature.scheme() { + SignatureScheme::ED25519 => { + let public_key = Ed25519PublicKey::from_bytes(public_key.as_ref()).unwrap(); + let signature = Ed25519Signature::from_bytes(signature.as_ref()).unwrap(); + public_key + .verify(response_hash.as_bytes(), &signature) + .map_err(|e| AtomaProxyError::InternalError { + message: format!("Failed to verify signature: {}", e), + endpoint: "verify_signature".to_string(), + })?; + } + SignatureScheme::Secp256k1 => { + let public_key = Secp256k1PublicKey::from_bytes(public_key.as_ref()).unwrap(); + let signature = Secp256k1Signature::from_bytes(signature.as_ref()).unwrap(); + public_key + .verify(response_hash.as_bytes(), &signature) + .map_err(|_| AtomaProxyError::InternalError { + message: "Failed to verify signature".to_string(), + endpoint: "verify_signature".to_string(), + })?; + } + SignatureScheme::Secp256r1 => { + let public_key = Secp256r1PublicKey::from_bytes(public_key.as_ref()).unwrap(); + let signature = Secp256r1Signature::from_bytes(signature.as_ref()).unwrap(); + public_key + .verify(response_hash.as_bytes(), &signature) + .map_err(|_| AtomaProxyError::InternalError { + message: "Failed to verify signature".to_string(), + endpoint: "verify_signature".to_string(), + })?; + } + _ => { + return Err(AtomaProxyError::InternalError { + message: "Currently unsupported signature scheme".to_string(), + endpoint: "verify_signature".to_string(), + }); + } + } + + // Sign with proxy's key + let proxy_signature = match keystore { + Keystore::File(keystore) => keystore + .sign_hashed(&keystore.addresses()[0], response_hash.as_bytes()) + .map_err(|e| AtomaProxyError::InternalError { + message: format!("Failed to create proxy signature: {}", e), + endpoint: "verify_signature".to_string(), + })?, + Keystore::InMem(keystore) => keystore + .sign_hashed(&keystore.addresses()[0], response_hash.as_bytes()) + .map_err(|e| AtomaProxyError::InternalError { + message: format!("Failed to create proxy signature: {}", e), + endpoint: "verify_signature".to_string(), + })?, + }; + // Convert signature to base64 + Ok(BASE64.encode(proxy_signature.as_ref())) +} diff --git a/atoma-proxy/src/server/streamer.rs b/atoma-proxy/src/server/streamer.rs index 3bc6e8dc..3d2a9253 100644 --- a/atoma-proxy/src/server/streamer.rs +++ b/atoma-proxy/src/server/streamer.rs @@ -1,9 +1,11 @@ use std::{ pin::Pin, + sync::Arc, task::{Context, Poll}, time::Instant, }; +use atoma_auth::Sui; use atoma_state::types::AtomaAtomaStateManagerEvent; use axum::body::Bytes; use axum::{response::sse::Event, Error}; @@ -12,10 +14,13 @@ use futures::Stream; use reqwest; use serde_json::Value; use sqlx::types::chrono::{DateTime, Utc}; +use tokio::sync::RwLockReadGuard; use tracing::{error, instrument}; use crate::server::handlers::{chat_completions::CHAT_COMPLETIONS_PATH, update_state_manager}; +use super::handlers::verify_and_sign_response; + /// The chunk that indicates the end of a streaming response const DONE_CHUNK: &str = "[DONE]"; @@ -32,13 +37,15 @@ const CHOICES: &str = "choices"; const USAGE: &str = "usage"; /// A structure for streaming chat completion chunks. -pub struct Streamer { +pub struct Streamer<'a> { /// The stream of bytes currently being processed stream: Pin> + Send>>, /// Current status of the stream status: StreamStatus, /// Estimated total tokens for the stream estimated_total_tokens: i64, + /// Keystore + keystore: RwLockReadGuard<'a, Sui>, /// Stack small id stack_small_id: i64, /// State manager sender @@ -68,7 +75,7 @@ pub enum StreamStatus { Failed(String), } -impl Streamer { +impl<'a> Streamer<'a> { /// Creates a new Streamer instance #[allow(clippy::too_many_arguments)] pub fn new( @@ -76,6 +83,7 @@ impl Streamer { state_manager_sender: Sender, stack_small_id: i64, estimated_total_tokens: i64, + keystore: RwLockReadGuard<'a, Sui>, start: Instant, node_id: i64, model_name: String, @@ -85,6 +93,7 @@ impl Streamer { stream: Box::pin(stream), status: StreamStatus::NotStarted, estimated_total_tokens, + keystore, stack_small_id, state_manager_sender, start, @@ -227,7 +236,7 @@ impl Streamer { } } -impl Stream for Streamer { +impl<'a> Stream for Streamer<'a> { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -313,6 +322,8 @@ impl Stream for Streamer { } } else if let Some(usage) = chunk.get(USAGE) { self.status = StreamStatus::Completed; + verify_and_sign_response(&chunk, self.keystore.get_keystore()) + .map_err(|e| Error::new(e.to_string()))?; self.handle_final_chunk(usage)?; } From 5f25fa65da589155ccc3d1291c337d4330c47db2 Mon Sep 17 00:00:00 2001 From: chad Date: Mon, 10 Feb 2025 13:26:58 -0500 Subject: [PATCH 4/7] fix: use blocking reads for signature verification --- .../src/server/handlers/chat_completions.rs | 3 +-- atoma-proxy/src/server/streamer.rs | 22 +++++++++++-------- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/atoma-proxy/src/server/handlers/chat_completions.rs b/atoma-proxy/src/server/handlers/chat_completions.rs index 815d7219..7f7a50a2 100644 --- a/atoma-proxy/src/server/handlers/chat_completions.rs +++ b/atoma-proxy/src/server/handlers/chat_completions.rs @@ -710,14 +710,13 @@ async fn handle_streaming_response( let stream = response.bytes_stream(); - let guard = state.sui.read().await; // Create the SSE stream let stream = Sse::new(Streamer::new( stream, state.state_manager_sender.clone(), selected_stack_small_id, estimated_total_tokens, - guard, + state.sui.clone(), start, node_id, model_name, diff --git a/atoma-proxy/src/server/streamer.rs b/atoma-proxy/src/server/streamer.rs index 65ce6260..38132927 100644 --- a/atoma-proxy/src/server/streamer.rs +++ b/atoma-proxy/src/server/streamer.rs @@ -10,11 +10,12 @@ use futures::Stream; use reqwest; use serde_json::Value; use sqlx::types::chrono::{DateTime, Utc}; -use tokio::sync::RwLockReadGuard; +use tokio::sync::RwLock; use crate::server::handlers::{chat_completions::CHAT_COMPLETIONS_PATH, update_state_manager}; use super::handlers::verify_and_sign_response; +use std::sync::Arc; use std::{ pin::Pin, task::{Context, Poll}, @@ -43,7 +44,7 @@ const CHOICES: &str = "choices"; const USAGE: &str = "usage"; /// A structure for streaming chat completion chunks. -pub struct Streamer<'a> { +pub struct Streamer { /// The stream of bytes currently being processed stream: Pin> + Send>>, /// Current status of the stream @@ -51,7 +52,7 @@ pub struct Streamer<'a> { /// Estimated total tokens for the stream estimated_total_tokens: i64, /// Keystore - keystore: RwLockReadGuard<'a, Sui>, + sui: Arc>, /// Stack small id stack_small_id: i64, /// State manager sender @@ -85,7 +86,7 @@ pub enum StreamStatus { Failed(String), } -impl<'a> Streamer<'a> { +impl Streamer { /// Creates a new Streamer instance #[allow(clippy::too_many_arguments)] pub fn new( @@ -93,7 +94,7 @@ impl<'a> Streamer<'a> { state_manager_sender: Sender, stack_small_id: i64, estimated_total_tokens: i64, - keystore: RwLockReadGuard<'a, Sui>, + sui: Arc>, start: Instant, node_id: i64, model_name: String, @@ -103,7 +104,7 @@ impl<'a> Streamer<'a> { stream: Box::pin(stream), status: StreamStatus::NotStarted, estimated_total_tokens, - keystore, + sui, stack_small_id, state_manager_sender, start, @@ -274,7 +275,7 @@ impl<'a> Streamer<'a> { } } -impl<'a> Stream for Streamer<'a> { +impl Stream for Streamer { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -461,8 +462,11 @@ impl<'a> Stream for Streamer<'a> { } } else if let Some(usage) = chunk.get(USAGE) { self.status = StreamStatus::Completed; - verify_and_sign_response(&chunk, verify_hash, self.keystore.get_keystore()) - .map_err(|e| Error::new(e.to_string()))?; + let _ = { + let guard = self.sui.blocking_read(); + verify_and_sign_response(&chunk, verify_hash, guard.get_keystore()) + .map_err(|e| Error::new(e.to_string()))? + }; // guard is dropped immediately after signature is created self.handle_final_chunk(usage)?; } From 2fa5bd4ab99b016f6867241181786ee29670a87d Mon Sep 17 00:00:00 2001 From: chad Date: Mon, 10 Feb 2025 13:27:11 -0500 Subject: [PATCH 5/7] fix: use blocking reads for signature verification --- atoma-proxy/src/server/handlers/chat_completions.rs | 3 ++- atoma-proxy/src/server/handlers/embeddings.rs | 4 +++- atoma-proxy/src/server/handlers/image_generations.rs | 3 ++- atoma-proxy/src/server/handlers/mod.rs | 4 ++-- 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/atoma-proxy/src/server/handlers/chat_completions.rs b/atoma-proxy/src/server/handlers/chat_completions.rs index 7f7a50a2..c376413e 100644 --- a/atoma-proxy/src/server/handlers/chat_completions.rs +++ b/atoma-proxy/src/server/handlers/chat_completions.rs @@ -489,6 +489,7 @@ pub fn confidential_chat_completions_create_stream( ) )] #[allow(clippy::too_many_arguments)] +#[allow(clippy::significant_drop_tightening)] async fn handle_non_streaming_response( state: &ProxyState, node_address: &String, @@ -554,7 +555,7 @@ async fn handle_non_streaming_response( let verify_hash = endpoint != CONFIDENTIAL_CHAT_COMPLETIONS_PATH; - let guard = state.sui.read().await; + let guard: tokio::sync::RwLockReadGuard<'_, atoma_auth::Sui> = state.sui.blocking_read(); let keystore = guard.get_keystore(); let proxy_signature = verify_and_sign_response(&response.0, verify_hash, keystore)?; diff --git a/atoma-proxy/src/server/handlers/embeddings.rs b/atoma-proxy/src/server/handlers/embeddings.rs index ab0de3c6..c37b5ae7 100644 --- a/atoma-proxy/src/server/handlers/embeddings.rs +++ b/atoma-proxy/src/server/handlers/embeddings.rs @@ -355,6 +355,7 @@ pub async fn confidential_embeddings_create( ) )] #[allow(clippy::too_many_arguments)] +#[allow(clippy::significant_drop_tightening)] async fn handle_embeddings_response( state: &ProxyState, node_address: String, @@ -402,7 +403,8 @@ async fn handle_embeddings_response( message: format!("Failed to parse embeddings response: {err:?}"), endpoint: endpoint.to_string(), })?; - let guard = state.sui.read().await; + + let guard = state.sui.blocking_read(); let keystore = guard.get_keystore(); let verify_hash = endpoint != CONFIDENTIAL_EMBEDDINGS_PATH; diff --git a/atoma-proxy/src/server/handlers/image_generations.rs b/atoma-proxy/src/server/handlers/image_generations.rs index 1c84292a..ab6e09ce 100644 --- a/atoma-proxy/src/server/handlers/image_generations.rs +++ b/atoma-proxy/src/server/handlers/image_generations.rs @@ -335,6 +335,7 @@ pub async fn confidential_image_generations_create( ) )] #[allow(clippy::too_many_arguments)] +#[allow(clippy::significant_drop_tightening)] async fn handle_image_generation_response( state: &ProxyState, node_address: String, @@ -381,7 +382,7 @@ async fn handle_image_generation_response( }) .map(Json)?; - let guard = state.sui.read().await; + let guard = state.sui.blocking_read(); let keystore = guard.get_keystore(); let verify_hash = endpoint != CONFIDENTIAL_IMAGE_GENERATIONS_PATH; diff --git a/atoma-proxy/src/server/handlers/mod.rs b/atoma-proxy/src/server/handlers/mod.rs index 1ba5497c..8d099d88 100644 --- a/atoma-proxy/src/server/handlers/mod.rs +++ b/atoma-proxy/src/server/handlers/mod.rs @@ -233,13 +233,13 @@ pub fn verify_and_sign_response( Keystore::File(keystore) => keystore .sign_hashed(&keystore.addresses()[0], &response_hash_bytes) .map_err(|e| AtomaProxyError::InternalError { - message: format!("Failed to create proxy signature: {}", e), + message: format!("Failed to create proxy signature: {e}"), endpoint: "verify_signature".to_string(), })?, Keystore::InMem(keystore) => keystore .sign_hashed(&keystore.addresses()[0], &response_hash_bytes) .map_err(|e| AtomaProxyError::InternalError { - message: format!("Failed to create proxy signature: {}", e), + message: format!("Failed to create proxy signature: {e}"), endpoint: "verify_signature".to_string(), })?, }; From 70fc3bfbc8d0632f9c38c29f3cbc8e48d0deb831 Mon Sep 17 00:00:00 2001 From: chad Date: Mon, 24 Mar 2025 17:51:59 -0500 Subject: [PATCH 6/7] build: remove unnecessary formatting from .tomls --- Cargo.toml | 138 +++++++++++++++++++++--------------------- atoma-auth/Cargo.toml | 62 +++++++++---------- 2 files changed, 101 insertions(+), 99 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b611eff2..cfea400e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,77 +1,79 @@ [workspace] -members = [ "atoma-auth", "atoma-proxy", "atoma-proxy-service", "atoma-state" ] resolver = "2" +members = ["atoma-auth", "atoma-proxy-service", "atoma-proxy", "atoma-state"] [workspace.package] +version = "0.1.0" edition = "2021" license = "Apache-2.0" -version = "0.1.0" [workspace.dependencies] -anyhow = "1.0.91" -async-trait = "0.1.88" -atoma-auth = { path = "./atoma-auth" } -atoma-p2p = { git = "https://github.com/atoma-network/atoma-node.git", package = "atoma-p2p", branch = "main" } -atoma-proxy-service = { path = "./atoma-proxy-service" } -atoma-state = { path = "./atoma-state" } -atoma-sui = { git = "https://github.com/atoma-network/atoma-node.git", package = "atoma-sui", branch = "main" } -atoma-utils = { git = "https://github.com/atoma-network/atoma-node.git", package = "atoma-utils", branch = "main" } -axum = "0.7.7" -base64 = "0.22.1" -bcs = "0.1.6" -blake2 = "0.10.6" -blake3 = "1.6.1" -chrono = "=0.4.39" -clap = "4.5.31" -config = "0.14.1" -dcap-qvl = "0.1.6" -fastcrypto = { git = "https://github.com/MystenLabs/fastcrypto", rev = "69d496c71fb37e3d22fe85e5bbfd4256d61422b9", package = "fastcrypto" } -fastcrypto-zkp = { git = "https://github.com/MystenLabs/fastcrypto", rev = "69d496c71fb37e3d22fe85e5bbfd4256d61422b9", package = "fastcrypto-zkp" } -fastrand = "2.3.0" -flume = "0.11.1" -futures = "0.3.31" -hex = "0.4.3" -hf-hub = "0.3.2" -isocountry = "0.3.2" -itertools = "0.14.0" -jsonwebtoken = "9.3.0" -mockito = "1.6.1" -once_cell = "1.21.0" -opentelemetry = "0.27.1" -opentelemetry-otlp = "0.27.0" -opentelemetry_sdk = "0.27.1" -pem = "3.0.5" -prometheus = "0.13.4" -proptest = "1.6.0" -rand = "0.8.5" -regex = "1.11.1" -remote-attestation = { git = "https://github.com/atoma-network/nvrust", branch = "main" } -reqwest = "0.12.12" -rsa = "0.9.7" -serde = "1.0.214" -serde_json = "1.0.140" -serde_yaml = "0.9.34" -serial_test = "3.1.1" -shared-crypto = { git = "https://github.com/mystenlabs/sui", package = "shared-crypto", tag = "testnet-v1.45.2" } -sqlx = { version = "0.8.2", features = [ "postgres", "runtime-tokio-native-tls" ] } -sui-keys = { git = "https://github.com/mystenlabs/sui", package = "sui-keys", tag = "testnet-v1.45.2" } -sui-sdk = { git = "https://github.com/mystenlabs/sui", package = "sui-sdk", tag = "testnet-v1.45.2" } -sui-sdk-types = "0.0.2" -thiserror = "2.0.12" -tokenizers = "0.21.0" -tokio = "1.41.0" -toml = "0.8.19" -tonic = "0.12" -tower = "0.5.1" -tower-http = "0.6.2" -tracing = "0.1.40" -tracing-appender = "0.2.3" -tracing-loki = "0.2.6" +anyhow = "1.0.91" +async-trait = "0.1.88" +atoma-auth = { path = "./atoma-auth" } +atoma-proxy-service = { path = "./atoma-proxy-service" } +atoma-state = { path = "./atoma-state" } +atoma-p2p = { git = "https://github.com/atoma-network/atoma-node.git", package = "atoma-p2p", branch = "main" } +atoma-sui = { git = "https://github.com/atoma-network/atoma-node.git", package = "atoma-sui", branch = "main" } +atoma-utils = { git = "https://github.com/atoma-network/atoma-node.git", package = "atoma-utils", branch = "main" } +axum = "0.7.7" +base64 = "0.22.1" +bcs = "0.1.6" +blake2 = "0.10.6" +blake3 = "1.6.1" +chrono = "=0.4.39" +clap = "4.5.31" +config = "0.14.1" +fastcrypto = { git = "https://github.com/MystenLabs/fastcrypto", rev = "69d496c71fb37e3d22fe85e5bbfd4256d61422b9", package = "fastcrypto" } +fastcrypto-zkp = { git = "https://github.com/MystenLabs/fastcrypto", rev = "69d496c71fb37e3d22fe85e5bbfd4256d61422b9", package = "fastcrypto-zkp" } +fastrand = "2.3.0" +flume = "0.11.1" +futures = "0.3.31" +hex = "0.4.3" +hf-hub = "0.3.2" +isocountry = "0.3.2" +itertools = "0.14.0" +jsonwebtoken = "9.3.0" +tracing-loki = "0.2.6" +mockito = "1.6.1" +remote-attestation = { git = "https://github.com/atoma-network/nvrust", branch = "main" } +once_cell = "1.21.0" +opentelemetry = "0.27.1" +opentelemetry-otlp = "0.27.0" +opentelemetry_sdk = "0.27.1" +prometheus = "0.13.4" +pem = "3.0.5" +proptest = "1.6.0" +rand = "0.8.5" +regex = "1.11.1" +reqwest = "0.12.12" +rsa = "0.9.7" +serde = "1.0.214" +serde_json = "1.0.140" +serde_yaml = "0.9.34" +serial_test = "3.1.1" +shared-crypto = { git = "https://github.com/mystenlabs/sui", package = "shared-crypto", tag = "testnet-v1.45.2" } +sqlx = { version = "0.8.2", features = [ + "postgres", + "runtime-tokio-native-tls", +] } +sui-keys = { git = "https://github.com/mystenlabs/sui", package = "sui-keys", tag = "testnet-v1.45.2" } +sui-sdk = { git = "https://github.com/mystenlabs/sui", package = "sui-sdk", tag = "testnet-v1.45.2" } +sui-sdk-types = "0.0.2" +thiserror = "2.0.12" +tokenizers = "0.21.0" +tokio = "1.41.0" +toml = "0.8.19" +tonic = "0.12" +tower = "0.5.1" +tower-http = "0.6.2" +tracing = "0.1.40" +tracing-appender = "0.2.3" tracing-opentelemetry = "0.28.0" -tracing-subscriber = "0.3.18" -url = "2.5.4" -utoipa = "5.2.0" -utoipa-swagger-ui = "8.0.3" -uuid = "1.15.1" -x25519-dalek = "2.0.1" -zeroize = "1.8.1" +tracing-subscriber = "0.3.18" +url = "2.5.4" +utoipa = "5.2.0" +utoipa-swagger-ui = "8.0.3" +uuid = "1.15.1" +x25519-dalek = "2.0.1" +zeroize = "1.8.1" diff --git a/atoma-auth/Cargo.toml b/atoma-auth/Cargo.toml index 5cf228e7..34127e05 100644 --- a/atoma-auth/Cargo.toml +++ b/atoma-auth/Cargo.toml @@ -1,39 +1,39 @@ [package] +name = "atoma-auth" +version.workspace = true edition.workspace = true license.workspace = true -name = "atoma-auth" -version.workspace = true [dependencies] -anyhow.workspace = true -atoma-state.workspace = true -atoma-sui.workspace = true -atoma-utils.workspace = true -base64.workspace = true -bcs.workspace = true -blake2.workspace = true -chrono.workspace = true -config.workspace = true -fastcrypto.workspace = true +anyhow.workspace = true +atoma-state.workspace = true +atoma-sui.workspace = true +atoma-utils.workspace = true +base64.workspace = true +bcs.workspace = true +blake2.workspace = true +chrono.workspace = true +config.workspace = true +fastcrypto.workspace = true fastcrypto-zkp.workspace = true -flume.workspace = true -hex.workspace = true -itertools.workspace = true -jsonwebtoken.workspace = true -pem.workspace = true -rand.workspace = true -regex.workspace = true -reqwest.workspace = true -rsa.workspace = true -serde = { workspace = true, features = [ "derive" ] } -serde_json.workspace = true -shared-crypto.workspace = true -sui-keys.workspace = true -sui-sdk.workspace = true -sui-sdk-types = { workspace = true, features = [ "serde" ] } -thiserror.workspace = true -tokio.workspace = true -tracing.workspace = true +flume.workspace = true +hex.workspace = true +itertools.workspace = true +jsonwebtoken.workspace = true +pem.workspace = true +rand.workspace = true +regex.workspace = true +reqwest.workspace = true +rsa.workspace = true +serde = { workspace = true, features = ["derive"] } +serde_json.workspace = true +shared-crypto.workspace = true +sui-keys.workspace = true +sui-sdk.workspace = true +sui-sdk-types = { workspace = true, features = ["serde"] } +thiserror.workspace = true +tokio.workspace = true +tracing.workspace = true [features] -google-oauth = [ ] +google-oauth = [] From 6c38b8cdfaa8cfd2fcb907024ffa95904765babd Mon Sep 17 00:00:00 2001 From: chad Date: Mon, 24 Mar 2025 18:01:01 -0500 Subject: [PATCH 7/7] chore: remove unnecessary blocking reads --- atoma-proxy/src/server/handlers/chat_completions.rs | 2 +- atoma-proxy/src/server/handlers/image_generations.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/atoma-proxy/src/server/handlers/chat_completions.rs b/atoma-proxy/src/server/handlers/chat_completions.rs index 0b347bd1..c6bb1d83 100644 --- a/atoma-proxy/src/server/handlers/chat_completions.rs +++ b/atoma-proxy/src/server/handlers/chat_completions.rs @@ -599,7 +599,7 @@ async fn handle_non_streaming_response( let verify_hash = endpoint != CONFIDENTIAL_CHAT_COMPLETIONS_PATH; - let guard: tokio::sync::RwLockReadGuard<'_, atoma_auth::Sui> = state.sui.blocking_read(); + let guard: tokio::sync::RwLockReadGuard<'_, atoma_auth::Sui> = state.sui.read().await; let keystore = guard.get_keystore(); let proxy_signature = verify_and_sign_response(&response.0, verify_hash, keystore)?; diff --git a/atoma-proxy/src/server/handlers/image_generations.rs b/atoma-proxy/src/server/handlers/image_generations.rs index 303e26b3..6ae5f06d 100644 --- a/atoma-proxy/src/server/handlers/image_generations.rs +++ b/atoma-proxy/src/server/handlers/image_generations.rs @@ -388,7 +388,7 @@ async fn handle_image_generation_response( }) .map(Json)?; - let guard = state.sui.blocking_read(); + let guard = state.sui.read().await; let keystore = guard.get_keystore(); let verify_hash = endpoint != CONFIDENTIAL_IMAGE_GENERATIONS_PATH;