diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f33cd6fb..6a3a0ac0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -10,7 +10,7 @@ name: CI merge_group: env: - toolchain: 1.84.0 + toolchain: stable CARGO_HTTP_MULTIPLEXING: false CARGO_TERM_COLOR: always CARGO_UNSTABLE_SPARSE_REGISTRY: true diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index d7ab78d1..089ba8f3 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -3,7 +3,7 @@ name: Coverage on: [push, pull_request] env: - toolchain: 1.84.0 + toolchain: stable CARGO_HTTP_MULTIPLEXING: false CARGO_TERM_COLOR: always CARGO_UNSTABLE_SPARSE_REGISTRY: true diff --git a/.gitignore b/.gitignore index 00acef7e..729ff475 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,5 @@ config.test.toml /config.toml local_key .rest/ +helm/atoma-proxy/charts/ open_router.json diff --git a/Dockerfile b/Dockerfile index fbbbbc43..f1e5c1ec 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,8 +17,8 @@ RUN apt-get update && apt-get install -y \ curl \ && rm -rf /var/lib/apt/lists/* -# Install Rust 1.84.0 -RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain 1.84.0 \ +# Install Rust 1.87.0 +RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain 1.87.0 \ && . "$HOME/.cargo/env" # Add cargo to PATH diff --git a/atoma-auth/src/auth.rs b/atoma-auth/src/auth.rs index f58e6ff4..ab302f9d 100644 --- a/atoma-auth/src/auth.rs +++ b/atoma-auth/src/auth.rs @@ -118,7 +118,56 @@ pub enum AuthError { TimestampConversionError, } -type Result = std::result::Result; +type Result = std::result::Result>; + +/// Sends an event to the state manager +/// +/// # Arguments +/// * `state_manager_sender` - The sender for the state manager +/// * `event` - The event to be sent +/// +/// # Returns +/// * `Result<()>` - If the event was sent successfully +#[instrument(level = "trace", skip_all)] +fn send_event( + state_manager_sender: &Sender, + event: AtomaAtomaStateManagerEvent, +) -> Result<()> { + Ok(state_manager_sender.send(event)?) +} + +/// Sends an event to the state manager and waits for a response +/// This function is used to send an event to the state manager and wait for a response. +/// It uses a oneshot channel to send the event and receive the response. +/// +/// # Arguments +/// * `state_manager_sender` - The sender for the state manager +/// * `event_creator` - A closure that creates the event to be sent +/// +/// # Returns +/// * `Result` - The result of the event, which can be either a success or an error +/// +/// # Errors +/// Returns an error if: +/// - The event could not be sent to the state manager +/// - The response could not be received from the oneshot channel +/// - The response from the state manager is an error +#[instrument(level = "trace", skip_all)] +async fn send_event_with_response( + state_manager_sender: &Sender, + event_creator: impl FnOnce( + oneshot::Sender>, + ) -> AtomaAtomaStateManagerEvent, +) -> Result { + let (result_sender, result_receiver) = + oneshot::channel::>(); + + state_manager_sender.send(event_creator(result_sender))?; + + let result = result_receiver.await??; + Ok(result) +} + /// The Auth struct #[derive(Clone)] pub struct Auth { @@ -201,11 +250,13 @@ impl Auth { &claims, &EncodingKey::from_secret(self.secret_key.as_ref()), )?; - self.state_manager_sender - .send(AtomaAtomaStateManagerEvent::StoreRefreshToken { + send_event( + &self.state_manager_sender, + AtomaAtomaStateManagerEvent::StoreRefreshToken { user_id, refresh_token_hash: self.hash_string(&token), - })?; + }, + )?; Ok(token) } @@ -233,7 +284,7 @@ impl Auth { let claims = token_data.claims; if claims.refresh_token_hash.is_none() != is_refresh { - return Err(AuthError::NotRefreshToken); + return Err(Box::new(AuthError::NotRefreshToken)); } Ok(claims) } @@ -254,14 +305,14 @@ impl Auth { user_id: i64, refresh_token_hash: &str, ) -> Result { - let (result_sender, result_receiver) = oneshot::channel(); - self.state_manager_sender - .send(AtomaAtomaStateManagerEvent::IsRefreshTokenValid { + send_event_with_response(&self.state_manager_sender, |result_receiver| { + AtomaAtomaStateManagerEvent::IsRefreshTokenValid { user_id, refresh_token_hash: refresh_token_hash.to_string(), - result_sender, - })?; - Ok(result_receiver.await??) + result_sender: result_receiver, + } + }) + .await } /// Generate a new access token from a refresh token @@ -283,7 +334,7 @@ impl Auth { .check_refresh_token_validity(claims.user_id, &refresh_token_hash) .await? { - return Err(AuthError::InvalidRefreshToken); + return Err(Box::new(AuthError::InvalidRefreshToken)); } let expiration = Utc::now() + Duration::days(self.access_token_lifetime as i64); @@ -330,24 +381,23 @@ impl Auth { user_profile: &UserProfile, password: &str, ) -> Result<(String, String)> { - let (result_sender, result_receiver) = oneshot::channel(); let password_salt = rand::thread_rng() .sample_iter(&rand::distributions::Alphanumeric) .take(30) .map(char::from) .collect::(); - self.state_manager_sender - .send(AtomaAtomaStateManagerEvent::RegisterUserWithPassword { + let user_id = send_event_with_response(&self.state_manager_sender, |result_sender| { + AtomaAtomaStateManagerEvent::RegisterUserWithPassword { user_profile: user_profile.clone(), password: self.hash_string(&format!("{password_salt}:{password}")), password_salt, result_sender, - })?; - let user_id = result_receiver - .await?? - .map(|user_id| user_id as u64) - .ok_or_else(|| AuthError::UserAlreadyRegistered)?; + } + }) + .await? + .map(|user_id| user_id as u64) + .ok_or_else(|| AuthError::UserAlreadyRegistered)?; let refresh_token = self.generate_refresh_token(user_id as i64).await?; let access_token = self.generate_access_token(&refresh_token).await?; Ok((refresh_token, access_token)) @@ -363,28 +413,25 @@ impl Auth { email: &str, password: &str, ) -> Result<(String, String)> { - let (result_sender, result_receiver) = oneshot::channel(); - self.state_manager_sender - .send(AtomaAtomaStateManagerEvent::GetPasswordSalt { + let password_salt = send_event_with_response(&self.state_manager_sender, |result_sender| { + AtomaAtomaStateManagerEvent::GetPasswordSalt { email: email.to_string(), result_sender, - })?; - let password_salt = result_receiver.await??; - - let password_salt = - password_salt.ok_or_else(|| AuthError::PasswordNotValidOrUserNotFound)?; + } + }) + .await? + .ok_or_else(|| AuthError::PasswordNotValidOrUserNotFound)?; - let (result_sender, result_receiver) = oneshot::channel(); - self.state_manager_sender - .send(AtomaAtomaStateManagerEvent::GetUserIdByEmailPassword { + let user_id = send_event_with_response(&self.state_manager_sender, |result_sender| { + AtomaAtomaStateManagerEvent::GetUserIdByEmailPassword { email: email.to_string(), password: self.hash_string(&format!("{password_salt}:{password}")), result_sender, - })?; - let user_id = result_receiver - .await?? - .map(|user_id| user_id as u64) - .ok_or_else(|| AuthError::PasswordNotValidOrUserNotFound)?; + } + }) + .await? + .map(|user_id| user_id as u64) + .ok_or_else(|| AuthError::PasswordNotValidOrUserNotFound)?; let refresh_token = self.generate_refresh_token(user_id as i64).await?; let access_token = self.generate_access_token(&refresh_token).await?; Ok((refresh_token, access_token)) @@ -403,26 +450,25 @@ impl Auth { &self.google_public_keys, )?; - let (result_sender, result_receiver) = oneshot::channel(); - let email = match claims.email { - Some(email) => email, - None => { - return Err(google::GoogleError::EmailNotFound)?; - } + let Some(email) = claims.email else { + return Err(google::GoogleError::EmailNotFound)?; }; + // In case this user doesn't have an account yet, we will add the password salt let password_salt = rand::thread_rng() .sample_iter(&rand::distributions::Alphanumeric) .take(30) .map(char::from) .collect::(); - self.state_manager_sender - .send(AtomaAtomaStateManagerEvent::OAuth { + + let user_id = send_event_with_response(&self.state_manager_sender, |result_sender| { + AtomaAtomaStateManagerEvent::OAuth { email, password_salt, result_sender, - })?; - let user_id = result_receiver.await??; + } + }) + .await?; let refresh_token = self.generate_refresh_token(user_id).await?; let access_token = self.generate_access_token(&refresh_token).await?; Ok((refresh_token, access_token)) @@ -448,12 +494,14 @@ impl Auth { .take(API_TOKEN_LENGTH) .map(char::from) .collect(); - self.state_manager_sender - .send(AtomaAtomaStateManagerEvent::StoreNewApiToken { + send_event( + &self.state_manager_sender, + AtomaAtomaStateManagerEvent::StoreNewApiToken { user_id: claims.user_id, api_token: api_token.clone(), name, - })?; + }, + )?; Ok(api_token) } @@ -472,12 +520,14 @@ impl Auth { #[instrument(level = "info", skip(self))] pub async fn revoke_api_token(&self, jwt: &str, api_token_id: i64) -> Result<()> { let claims = self.get_claims_from_token(jwt).await?; - self.state_manager_sender - .send(AtomaAtomaStateManagerEvent::RevokeApiToken { + + send_event( + &self.state_manager_sender, + AtomaAtomaStateManagerEvent::RevokeApiToken { user_id: claims.user_id, api_token_id, - })?; - Ok(()) + }, + ) } /// Get all API tokens for a user @@ -495,13 +545,13 @@ impl Auth { pub async fn get_all_api_tokens(&self, jwt: &str) -> Result> { let claims = self.get_claims_from_token(jwt).await?; - let (result_sender, result_receiver) = oneshot::channel(); - self.state_manager_sender - .send(AtomaAtomaStateManagerEvent::GetApiTokensForUser { + send_event_with_response(&self.state_manager_sender, |result_sender| { + AtomaAtomaStateManagerEvent::GetApiTokensForUser { user_id: claims.user_id, result_sender, - })?; - Ok(result_receiver.await??) + } + }) + .await } /// Get the claims from the token @@ -527,7 +577,7 @@ impl Auth { ) .await? { - return Err(AuthError::RevokedToken); + return Err(Box::new(AuthError::RevokedToken)); } Ok(claims) } @@ -571,21 +621,24 @@ impl Auth { /// /// Map a base64 string to a bit array by taking each char's index and convert it to binary form with one bit per u8 /// element in the output. Returns SignatureError if one of the characters is not in the base64 charset. + #[allow(clippy::cast_possible_truncation)] fn base64_to_bitarray(input: &str) -> Result> { - use itertools::Itertools; const BASE64_URL_CHARSET: &str = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_"; - input .chars() - .map(|c| { + .flat_map(|c| { BASE64_URL_CHARSET .find(c) - .map(|index| u8::try_from(index).map_err(AuthError::IntConversionError)) - .unwrap() - .map(|index| (0..6).rev().map(move |i| index >> i & 1)) + .ok_or_else(|| { + Box::new(AuthError::FailedToParseSignature(format!( + "Invalid character: {c}" + ))) + }) + .map(|index| (0..6).rev().map(move |i| Ok((index >> i & 1) as u8))) + .into_iter() }) - .flatten_ok() + .flatten() .collect() } @@ -701,9 +754,9 @@ impl Auth { } UserSignature::Simple(simple_signature) => (simple_signature, None), _ => { - return Err(AuthError::FailedToParseSignature( + return Err(Box::new(AuthError::FailedToParseSignature( "Unsupported signature".to_string(), - )) + ))) } }; match signature { @@ -777,12 +830,13 @@ impl Auth { ), )?; - self.state_manager_sender - .send(AtomaAtomaStateManagerEvent::UpdateSuiAddress { + send_event( + &self.state_manager_sender, + AtomaAtomaStateManagerEvent::UpdateSuiAddress { user_id: claims.user_id, sui_address: sui_address.to_string(), - })?; - Ok(()) + }, + ) } /// Updates the balance of the user @@ -815,14 +869,13 @@ impl Auth { ) -> Result<()> { let claims = self.validate_token(jwt, false)?; - let (result_sender, result_receiver) = oneshot::channel(); - self.state_manager_sender.send( + send_event_with_response(&self.state_manager_sender, |result_sender| { AtomaAtomaStateManagerEvent::InsertNewUsdcPaymentDigest { digest: transaction_digest.to_string(), result_sender, - }, - )?; - result_receiver.await??; + } + }) + .await?; let mut balance_changes = Err(anyhow!("No balance changes found")); for _ in 0..SUI_BALANCE_RETRY_COUNT { balance_changes = self @@ -845,14 +898,14 @@ impl Auth { if tag.address.to_hex() == self.sui.read().await.usdc_package_id.to_hex() { if balance_change.amount < 0 { if sender.is_some() { - return Err(AuthError::MultipleSenders); + return Err(Box::new(AuthError::MultipleSenders)); } if let Owner::AddressOwner(owner) = &balance_change.owner { sender = Some(*owner); } } else { if receiver.is_some() { - return Err(AuthError::MultipleReceivers); + return Err(Box::new(AuthError::MultipleReceivers)); } money_in = Some(balance_change.amount); if let Owner::AddressOwner(owner) = &balance_change.owner { @@ -863,7 +916,7 @@ impl Auth { } } if sender.is_none() || receiver.is_none() { - return Err(AuthError::SenderOrReceiverNotFound); + return Err(Box::new(AuthError::SenderOrReceiverNotFound)); } let sender = sender.unwrap(); let receiver = receiver.unwrap(); @@ -878,37 +931,40 @@ impl Auth { != Self::get_sui_address_from_signature(&signature, transaction_digest)? .to_string() { - return Err(AuthError::PaymentNotForThisUser); + return Err(Box::new(AuthError::PaymentNotForThisUser)); } } #[cfg(not(feature = "google-oauth"))] Some(_) => { - return Err(AuthError::ZkLoginNotEnabled); + return Err(Box::new(AuthError::ZkLoginNotEnabled)); } None => { - let (result_sender, result_receiver) = oneshot::channel(); - self.state_manager_sender - .send(AtomaAtomaStateManagerEvent::ConfirmUser { - sui_address: sender.to_string(), - user_id: claims.user_id, - result_sender, - })?; - let is_their_wallet = result_receiver.await??; + let is_their_wallet = + send_event_with_response(&self.state_manager_sender, |result_sender| { + AtomaAtomaStateManagerEvent::ConfirmUser { + sui_address: sender.to_string(), + user_id: claims.user_id, + result_sender, + } + }) + .await?; if !is_their_wallet { - return Err(AuthError::PaymentNotForThisUser); + return Err(Box::new(AuthError::PaymentNotForThisUser)); } } } // We are the receiver and we know the sender - self.state_manager_sender - .send(AtomaAtomaStateManagerEvent::TopUpBalance { + send_event( + &self.state_manager_sender, + AtomaAtomaStateManagerEvent::TopUpBalance { user_id: claims.user_id, amount: i64::try_from(money_in.unwrap()).map_err(|e| { AuthError::AnyhowError(anyhow!("Failed to convert amount: {e}")) })?, - })?; + }, + )?; } Ok(()) } @@ -928,14 +984,80 @@ impl Auth { /// * If the verification fails pub async fn get_sui_address(&self, jwt: &str) -> Result> { let claims = self.validate_token(jwt, false)?; - let (result_sender, result_receiver) = oneshot::channel(); - self.state_manager_sender - .send(AtomaAtomaStateManagerEvent::GetSuiAddress { + send_event_with_response(&self.state_manager_sender, |result_sender| { + AtomaAtomaStateManagerEvent::GetSuiAddress { user_id: claims.user_id, result_sender, - })?; - let sui_address = result_receiver.await; - Ok(sui_address??) + } + }) + .await + } +} + +impl From for Box { + fn from(err: anyhow::Error) -> Self { + Self::new(AuthError::AnyhowError(err)) + } +} + +impl From for Box { + fn from(err: jsonwebtoken::errors::Error) -> Self { + Self::new(AuthError::JsonWebTokenError(err)) + } +} + +impl From> for Box { + fn from(err: flume::SendError) -> Self { + Self::new(AuthError::FlumeError(err)) + } +} + +impl From for Box { + fn from(err: tokio::sync::oneshot::error::RecvError) -> Self { + Self::new(AuthError::OneShotReceiveError(err)) + } +} + +impl From for Box { + fn from(err: AtomaStateManagerError) -> Self { + Self::new(AuthError::AtomaStateManagerError(err)) + } +} + +impl From for Box { + fn from(err: reqwest::Error) -> Self { + Self::new(AuthError::ReqwestError(err)) + } +} + +impl From for Box { + fn from(err: bcs::Error) -> Self { + Self::new(AuthError::BcsError(err)) + } +} + +impl From for Box { + fn from(err: FastCryptoError) -> Self { + Self::new(AuthError::FastCryptoError(err)) + } +} + +impl From for Box { + fn from(err: std::num::TryFromIntError) -> Self { + Self::new(AuthError::IntConversionError(err)) + } +} + +#[cfg(feature = "google-oauth")] +impl From for Box { + fn from(err: crate::google::GoogleError) -> Self { + Self::new(AuthError::GoogleError(err)) + } +} + +impl From for Box { + fn from(err: SuiError) -> Self { + Self::new(AuthError::SuiError(err)) } } diff --git a/atoma-proxy-service/src/components/openapi.rs b/atoma-proxy-service/src/components/openapi.rs index 0f9e7cad..07130368 100644 --- a/atoma-proxy-service/src/components/openapi.rs +++ b/atoma-proxy-service/src/components/openapi.rs @@ -147,7 +147,7 @@ pub fn openapi_router() -> Router { let spec_path = docs_dir.join("openapi.yml"); fs::write(&spec_path, spec).expect("Failed to write OpenAPI spec to file"); - println!("OpenAPI spec written to: {spec_path:?}"); + println!("OpenAPI spec written to: {}", spec_path.display()); } Router::new().merge(SwaggerUi::new("/swagger-ui").url("/api-docs/openapi.json", openapi)) diff --git a/atoma-proxy-service/src/handlers/auth.rs b/atoma-proxy-service/src/handlers/auth.rs index 73d44b08..77f74704 100644 --- a/atoma-proxy-service/src/handlers/auth.rs +++ b/atoma-proxy-service/src/handlers/auth.rs @@ -276,7 +276,7 @@ pub async fn register( .await .map_err(|e| { error!("Failed to register user: {:?}", e); - match e { + match *e { AuthError::UserAlreadyRegistered => StatusCode::CONFLICT, _ => StatusCode::INTERNAL_SERVER_ERROR, } @@ -325,7 +325,7 @@ pub async fn login( .await .map_err(|e| { error!("Failed to login user: {:?}", e); - match e { + match *e { AuthError::PasswordNotValidOrUserNotFound => StatusCode::UNAUTHORIZED, _ => StatusCode::INTERNAL_SERVER_ERROR, } diff --git a/atoma-proxy/src/server/components/openapi.rs b/atoma-proxy/src/server/components/openapi.rs index c2c65c70..e53cf2e2 100644 --- a/atoma-proxy/src/server/components/openapi.rs +++ b/atoma-proxy/src/server/components/openapi.rs @@ -240,7 +240,7 @@ pub fn openapi_routes() -> Router { let spec_path = docs_dir.join("openapi.yml"); fs::write(&spec_path, spec).expect("Failed to write OpenAPI spec to file"); - println!("OpenAPI spec written to: {spec_path:?}"); + println!("OpenAPI spec written to: {}", spec_path.display()); } Router::new() diff --git a/atoma-proxy/src/server/handlers/chat_completions.rs b/atoma-proxy/src/server/handlers/chat_completions.rs index 2609cb06..376c684f 100644 --- a/atoma-proxy/src/server/handlers/chat_completions.rs +++ b/atoma-proxy/src/server/handlers/chat_completions.rs @@ -34,6 +34,7 @@ use openai_api::{ }; use openai_api::{CreateChatCompletionRequest, CreateChatCompletionStreamRequest}; use opentelemetry::KeyValue; +use reqwest::StatusCode; use serde::Deserialize; use serde_json::Value; use sqlx::types::chrono::{DateTime, Utc}; @@ -46,9 +47,10 @@ use super::metrics::{ CHAT_COMPLETIONS_INPUT_TOKENS, CHAT_COMPLETIONS_INPUT_TOKENS_PER_USER, CHAT_COMPLETIONS_LATENCY_METRICS, CHAT_COMPLETIONS_NUM_REQUESTS, CHAT_COMPLETIONS_TOTAL_TOKENS, CHAT_COMPLETIONS_TOTAL_TOKENS_PER_USER, CHAT_COMPLETION_REQUESTS_PER_USER, - INTENTIONALLY_CANCELLED_CHAT_COMPLETION_STREAMING_REQUESTS, TOTAL_COMPLETED_REQUESTS, - TOTAL_FAILED_CHAT_REQUESTS, TOTAL_FAILED_REQUESTS, - UNSUCCESSFUL_CHAT_COMPLETION_REQUESTS_PER_USER, + INTENTIONALLY_CANCELLED_CHAT_COMPLETION_STREAMING_REQUESTS, TOTAL_BAD_REQUESTS, + TOTAL_COMPLETED_REQUESTS, TOTAL_FAILED_CHAT_REQUESTS, TOTAL_FAILED_REQUESTS, + TOTAL_LOCKED_REQUESTS, TOTAL_TOO_EARLY_REQUESTS, TOTAL_TOO_MANY_REQUESTS, + TOTAL_UNAUTHORIZED_REQUESTS, UNSUCCESSFUL_CHAT_COMPLETION_REQUESTS_PER_USER, }; use super::request_model::{ComputeUnitsEstimate, RequestModel}; use super::{ @@ -78,6 +80,12 @@ pub const CHAT_COMPLETIONS_PATH: &str = "/v1/chat/completions"; /// The messages field in the request payload. const MESSAGES: &str = "messages"; +/// The model key +const MODEL_KEY: &str = "model"; + +/// The user id key +const USER_ID_KEY: &str = "user_id"; + #[derive(OpenApi)] #[openapi( paths(chat_completions_create, chat_completions_create_stream), @@ -175,12 +183,32 @@ pub async fn chat_completions_create( Ok(response) } Err(e) => { - TOTAL_FAILED_CHAT_REQUESTS - .add(1, &[KeyValue::new("model", metadata.model_name.clone())]); - TOTAL_FAILED_REQUESTS - .add(1, &[KeyValue::new("model", metadata.model_name.clone())]); - UNSUCCESSFUL_CHAT_COMPLETION_REQUESTS_PER_USER - .add(1, &[KeyValue::new("user_id", metadata.user_id)]); + let model = metadata.model_name.clone(); + match e.status_code() { + StatusCode::TOO_MANY_REQUESTS => { + TOTAL_TOO_MANY_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + } + StatusCode::BAD_REQUEST => { + TOTAL_BAD_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + } + StatusCode::LOCKED => { + TOTAL_LOCKED_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + } + StatusCode::TOO_EARLY => { + TOTAL_TOO_EARLY_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + } + StatusCode::UNAUTHORIZED => { + TOTAL_UNAUTHORIZED_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + } + _ => { + TOTAL_FAILED_CHAT_REQUESTS + .add(1, &[KeyValue::new(MODEL_KEY, model.clone())]); + TOTAL_FAILED_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + + UNSUCCESSFUL_CHAT_COMPLETION_REQUESTS_PER_USER + .add(1, &[KeyValue::new(USER_ID_KEY, metadata.user_id)]); + } + } if let Some(stack_small_id) = metadata.selected_stack_small_id { update_state_manager( &state.state_manager_sender, diff --git a/atoma-proxy/src/server/handlers/completions.rs b/atoma-proxy/src/server/handlers/completions.rs index 9f28644e..9f2d4078 100644 --- a/atoma-proxy/src/server/handlers/completions.rs +++ b/atoma-proxy/src/server/handlers/completions.rs @@ -23,6 +23,7 @@ use openai_api_completions::{ Usage, }; use opentelemetry::KeyValue; +use reqwest::StatusCode; use serde::Deserialize; use serde_json::Value; use sqlx::types::chrono::{DateTime, Utc}; @@ -31,11 +32,14 @@ use utoipa::OpenApi; use super::metrics::{ CHAT_COMPLETIONS_COMPLETIONS_TOKENS, CHAT_COMPLETIONS_COMPLETIONS_TOKENS_PER_USER, - CHAT_COMPLETIONS_INPUT_TOKENS, CHAT_COMPLETIONS_INPUT_TOKENS_PER_USER, - CHAT_COMPLETIONS_LATENCY_METRICS, CHAT_COMPLETIONS_NUM_REQUESTS, CHAT_COMPLETIONS_TOTAL_TOKENS, + CHAT_COMPLETIONS_CONFIDENTIAL_NUM_REQUESTS, CHAT_COMPLETIONS_INPUT_TOKENS, + CHAT_COMPLETIONS_INPUT_TOKENS_PER_USER, CHAT_COMPLETIONS_LATENCY_METRICS, + CHAT_COMPLETIONS_NUM_REQUESTS, CHAT_COMPLETIONS_TOTAL_TOKENS, CHAT_COMPLETIONS_TOTAL_TOKENS_PER_USER, CHAT_COMPLETION_REQUESTS_PER_USER, - INTENTIONALLY_CANCELLED_CHAT_COMPLETION_STREAMING_REQUESTS, TOTAL_COMPLETED_REQUESTS, - TOTAL_FAILED_CHAT_REQUESTS, TOTAL_FAILED_REQUESTS, + INTENTIONALLY_CANCELLED_CHAT_COMPLETION_STREAMING_REQUESTS, TOTAL_BAD_REQUESTS, + TOTAL_COMPLETED_REQUESTS, TOTAL_FAILED_CHAT_CONFIDENTIAL_REQUESTS, TOTAL_FAILED_CHAT_REQUESTS, + TOTAL_FAILED_REQUESTS, TOTAL_LOCKED_REQUESTS, TOTAL_TOO_EARLY_REQUESTS, + TOTAL_TOO_MANY_REQUESTS, TOTAL_UNAUTHORIZED_REQUESTS, UNSUCCESSFUL_CHAT_COMPLETION_REQUESTS_PER_USER, }; use super::request_model::{ComputeUnitsEstimate, RequestModel}; @@ -57,6 +61,12 @@ pub const CONFIDENTIAL_COMPLETIONS_PATH: &str = "/v1/confidential/completions"; /// The key for the prompt in the request. const PROMPT: &str = "prompt"; +/// The model key +const MODEL_KEY: &str = "model"; + +/// The user id key +const USER_ID_KEY: &str = "user_id"; + /// The OpenAPI schema for the completions endpoint. #[derive(OpenApi)] #[openapi( @@ -133,12 +143,33 @@ pub async fn completions_create( Ok(response) } Err(e) => { - TOTAL_FAILED_CHAT_REQUESTS - .add(1, &[KeyValue::new("model", metadata.model_name.clone())]); - TOTAL_FAILED_REQUESTS - .add(1, &[KeyValue::new("model", metadata.model_name.clone())]); - UNSUCCESSFUL_CHAT_COMPLETION_REQUESTS_PER_USER - .add(1, &[KeyValue::new("user_id", metadata.user_id)]); + let model = metadata.model_name.clone(); + match e.status_code() { + StatusCode::TOO_MANY_REQUESTS => { + TOTAL_TOO_MANY_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + } + StatusCode::BAD_REQUEST => { + TOTAL_BAD_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + } + StatusCode::LOCKED => { + TOTAL_LOCKED_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + } + StatusCode::TOO_EARLY => { + TOTAL_TOO_EARLY_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + } + StatusCode::UNAUTHORIZED => { + TOTAL_UNAUTHORIZED_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + } + _ => { + TOTAL_FAILED_CHAT_REQUESTS + .add(1, &[KeyValue::new(MODEL_KEY, model.clone())]); + TOTAL_FAILED_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + + UNSUCCESSFUL_CHAT_COMPLETION_REQUESTS_PER_USER + .add(1, &[KeyValue::new(USER_ID_KEY, metadata.user_id)]); + } + } + if let Some(stack_small_id) = metadata.selected_stack_small_id { update_state_manager( &state.state_manager_sender, @@ -381,19 +412,38 @@ pub async fn confidential_completions_create( Ok(response) => { if !is_streaming { // The streaming metric is recorded in the streamer (final chunk) - TOTAL_COMPLETED_REQUESTS.add(1, &[KeyValue::new("model", metadata.model_name)]); + CHAT_COMPLETIONS_CONFIDENTIAL_NUM_REQUESTS + .add(1, &[KeyValue::new(MODEL_KEY, metadata.model_name)]); } Ok(response) } Err(e) => { - let model_label: String = metadata.model_name.clone(); - TOTAL_FAILED_CHAT_REQUESTS.add(1, &[KeyValue::new("model", model_label.clone())]); - - // Record the failed request in the total failed requests metric - TOTAL_FAILED_REQUESTS.add(1, &[KeyValue::new("model", model_label)]); - UNSUCCESSFUL_CHAT_COMPLETION_REQUESTS_PER_USER - .add(1, &[KeyValue::new("user_id", metadata.user_id)]); + let model = metadata.model_name.clone(); + match e.status_code() { + StatusCode::TOO_MANY_REQUESTS => { + TOTAL_TOO_MANY_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + } + StatusCode::BAD_REQUEST => { + TOTAL_BAD_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + } + StatusCode::LOCKED => { + TOTAL_LOCKED_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + } + StatusCode::TOO_EARLY => { + TOTAL_TOO_EARLY_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + } + StatusCode::UNAUTHORIZED => { + TOTAL_UNAUTHORIZED_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + } + _ => { + TOTAL_FAILED_CHAT_CONFIDENTIAL_REQUESTS + .add(1, &[KeyValue::new(MODEL_KEY, model.clone())]); + TOTAL_FAILED_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + UNSUCCESSFUL_CHAT_COMPLETION_REQUESTS_PER_USER + .add(1, &[KeyValue::new(USER_ID_KEY, metadata.user_id)]); + } + } if let Some(stack_small_id) = metadata.selected_stack_small_id { update_state_manager( &state.state_manager_sender, @@ -670,7 +720,7 @@ async fn handle_non_streaming_response( /// * `node_address` - The address of the node /// * `user_id` - The user id /// * `headers` - The headers of the request -/// * `payload` - The payload of the request +/// * `payload` - The payload of the request /// * `num_input_tokens` - The number of input tokens /// * `estimated_output_tokens` - The estimated output tokens /// * `price_per_million` - The price per million @@ -687,7 +737,7 @@ async fn handle_non_streaming_response( /// * `serde_json::Error` - If the request fails /// * `flume::Error` - If the request fails /// * `tokio::Error` - If the request fails -/// +/// #[instrument( level = "info", skip_all, diff --git a/atoma-proxy/src/server/handlers/embeddings.rs b/atoma-proxy/src/server/handlers/embeddings.rs index fbc1db86..2e2eba48 100644 --- a/atoma-proxy/src/server/handlers/embeddings.rs +++ b/atoma-proxy/src/server/handlers/embeddings.rs @@ -9,6 +9,7 @@ use axum::{ Extension, Json, }; use opentelemetry::KeyValue; +use reqwest::StatusCode; use serde::{Deserialize, Serialize}; use serde_json::Value; use sqlx::types::chrono::{DateTime, Utc}; @@ -28,8 +29,10 @@ use super::{ handle_status_code_error, metrics::{ EMBEDDING_TOTAL_TOKENS_PER_USER, SUCCESSFUL_TEXT_EMBEDDING_REQUESTS_PER_USER, - TEXT_EMBEDDINGS_LATENCY_METRICS, TEXT_EMBEDDINGS_NUM_REQUESTS, TOTAL_COMPLETED_REQUESTS, - TOTAL_FAILED_REQUESTS, TOTAL_FAILED_TEXT_EMBEDDING_REQUESTS, + TEXT_EMBEDDINGS_LATENCY_METRICS, TEXT_EMBEDDINGS_NUM_REQUESTS, TOTAL_BAD_REQUESTS, + TOTAL_COMPLETED_REQUESTS, TOTAL_FAILED_CONFIDENTIAL_EMBEDDING_REQUESTS, + TOTAL_FAILED_REQUESTS, TOTAL_FAILED_TEXT_EMBEDDING_REQUESTS, TOTAL_LOCKED_REQUESTS, + TOTAL_TOO_EARLY_REQUESTS, TOTAL_TOO_MANY_REQUESTS, TOTAL_UNAUTHORIZED_REQUESTS, UNSUCCESSFUL_TEXT_EMBEDDING_REQUESTS_PER_USER, }, request_model::{ComputeUnitsEstimate, RequestModel}, @@ -52,6 +55,12 @@ pub const EMBEDDINGS_PATH: &str = "/v1/embeddings"; /// The input field in the request payload. const INPUT: &str = "input"; +/// The model key +const MODEL_KEY: &str = "model"; + +/// The user id key +const USER_ID_KEY: &str = "user_id"; + // A model representing an embeddings request payload. /// /// This struct encapsulates the necessary fields for processing an embeddings request @@ -224,11 +233,32 @@ pub async fn embeddings_create( Ok(Json(response).into_response()) } Err(e) => { - let model_label: String = metadata.model_name.clone(); - TOTAL_FAILED_REQUESTS.add(1, &[KeyValue::new("model", model_label.clone())]); - TOTAL_FAILED_TEXT_EMBEDDING_REQUESTS.add(1, &[KeyValue::new("model", model_label)]); - UNSUCCESSFUL_TEXT_EMBEDDING_REQUESTS_PER_USER - .add(1, &[KeyValue::new("user_id", metadata.user_id)]); + let model = metadata.model_name.clone(); + match e.status_code() { + StatusCode::TOO_MANY_REQUESTS => { + TOTAL_TOO_MANY_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + } + StatusCode::BAD_REQUEST => { + TOTAL_BAD_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + } + StatusCode::LOCKED => { + TOTAL_LOCKED_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + } + StatusCode::TOO_EARLY => { + TOTAL_TOO_EARLY_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + } + StatusCode::UNAUTHORIZED => { + TOTAL_UNAUTHORIZED_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + } + _ => { + TOTAL_FAILED_TEXT_EMBEDDING_REQUESTS + .add(1, &[KeyValue::new(MODEL_KEY, model.clone())]); + TOTAL_FAILED_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + + UNSUCCESSFUL_TEXT_EMBEDDING_REQUESTS_PER_USER + .add(1, &[KeyValue::new(USER_ID_KEY, metadata.user_id)]); + } + } match metadata.selected_stack_small_id { Some(stack_small_id) => { update_state_manager( @@ -374,11 +404,32 @@ pub async fn confidential_embeddings_create( Ok(Json(response).into_response()) } Err(e) => { - let model_label: String = metadata.model_name.clone(); - TOTAL_FAILED_REQUESTS.add(1, &[KeyValue::new("model", model_label.clone())]); - TOTAL_FAILED_TEXT_EMBEDDING_REQUESTS.add(1, &[KeyValue::new("model", model_label)]); - UNSUCCESSFUL_TEXT_EMBEDDING_REQUESTS_PER_USER - .add(1, &[KeyValue::new("user_id", metadata.user_id)]); + let model = metadata.model_name.clone(); + match e.status_code() { + StatusCode::TOO_MANY_REQUESTS => { + TOTAL_TOO_MANY_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + } + StatusCode::BAD_REQUEST => { + TOTAL_BAD_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + } + StatusCode::LOCKED => { + TOTAL_LOCKED_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + } + StatusCode::TOO_EARLY => { + TOTAL_TOO_EARLY_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + } + StatusCode::UNAUTHORIZED => { + TOTAL_UNAUTHORIZED_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + } + _ => { + TOTAL_FAILED_CONFIDENTIAL_EMBEDDING_REQUESTS + .add(1, &[KeyValue::new(MODEL_KEY, model.clone())]); + TOTAL_FAILED_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + + UNSUCCESSFUL_TEXT_EMBEDDING_REQUESTS_PER_USER + .add(1, &[KeyValue::new(USER_ID_KEY, metadata.user_id)]); + } + } match metadata.selected_stack_small_id { Some(stack_small_id) => { diff --git a/atoma-proxy/src/server/handlers/image_generations.rs b/atoma-proxy/src/server/handlers/image_generations.rs index 31dbbb80..78061d74 100644 --- a/atoma-proxy/src/server/handlers/image_generations.rs +++ b/atoma-proxy/src/server/handlers/image_generations.rs @@ -6,6 +6,7 @@ use axum::response::{IntoResponse, Response}; use axum::Extension; use axum::{extract::State, http::HeaderMap, Json}; use opentelemetry::KeyValue; +use reqwest::StatusCode; use serde::{Deserialize, Serialize}; use serde_json::Value; use sqlx::types::chrono::{DateTime, Utc}; @@ -19,8 +20,10 @@ use crate::server::{http_server::ProxyState, middleware::RequestMetadataExtensio use super::metrics::{ IMAGE_GENERATION_TOTAL_TOKENS_PER_USER, IMAGE_GEN_LATENCY_METRICS, IMAGE_GEN_NUM_REQUESTS, - SUCCESSFUL_IMAGE_GENERATION_REQUESTS_PER_USER, TOTAL_COMPLETED_REQUESTS, - TOTAL_FAILED_IMAGE_GENERATION_REQUESTS, TOTAL_FAILED_REQUESTS, + SUCCESSFUL_IMAGE_GENERATION_REQUESTS_PER_USER, TOTAL_BAD_REQUESTS, TOTAL_COMPLETED_REQUESTS, + TOTAL_FAILED_CONFIDENTIAL_IMAGE_GENERATION_REQUESTS, TOTAL_FAILED_IMAGE_GENERATION_REQUESTS, + TOTAL_FAILED_REQUESTS, TOTAL_LOCKED_REQUESTS, TOTAL_TOO_EARLY_REQUESTS, + TOTAL_TOO_MANY_REQUESTS, TOTAL_UNAUTHORIZED_REQUESTS, UNSUCCESSFUL_IMAGE_GENERATION_REQUESTS_PER_USER, }; use super::request_model::ComputeUnitsEstimate; @@ -47,6 +50,12 @@ const N: &str = "n"; /// The size field in the request payload. const SIZE: &str = "size"; +/// The model key +const MODEL_KEY: &str = "model"; + +/// The user id key +const USER_ID_KEY: &str = "user_id"; + /// A model representing the parameters for an image generation request. /// /// This struct encapsulates the required parameters for generating images through @@ -217,12 +226,32 @@ pub async fn image_generations_create( } Err(e) => { // Record the failed request in the image generations num requests metric - let model_label: String = metadata.model_name.clone(); - TOTAL_FAILED_IMAGE_GENERATION_REQUESTS - .add(1, &[KeyValue::new("model", model_label.clone())]); + let model: String = metadata.model_name.clone(); + match e.status_code() { + StatusCode::TOO_MANY_REQUESTS => { + TOTAL_TOO_MANY_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + } + StatusCode::BAD_REQUEST => { + TOTAL_BAD_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + } + StatusCode::LOCKED => { + TOTAL_LOCKED_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + } + StatusCode::TOO_EARLY => { + TOTAL_TOO_EARLY_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + } + StatusCode::UNAUTHORIZED => { + TOTAL_UNAUTHORIZED_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + } + _ => { + TOTAL_FAILED_IMAGE_GENERATION_REQUESTS + .add(1, &[KeyValue::new(MODEL_KEY, model.clone())]); + TOTAL_FAILED_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); - // Record the failed request in the total failed requests metric - TOTAL_FAILED_REQUESTS.add(1, &[KeyValue::new("model", model_label)]); + UNSUCCESSFUL_IMAGE_GENERATION_REQUESTS_PER_USER + .add(1, &[KeyValue::new(USER_ID_KEY, metadata.user_id)]); + } + } match metadata.selected_stack_small_id { Some(stack_small_id) => { @@ -327,20 +356,38 @@ pub async fn confidential_image_generations_create( Ok(response) => { // NOTE: At this point, we do not need to update the stack num compute units, // because the image generation response was correctly generated by a TEE node. - TOTAL_COMPLETED_REQUESTS.add(1, &[KeyValue::new("model", metadata.model_name)]); + TOTAL_COMPLETED_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, metadata.model_name)]); SUCCESSFUL_IMAGE_GENERATION_REQUESTS_PER_USER - .add(1, &[KeyValue::new("user_id", metadata.user_id)]); + .add(1, &[KeyValue::new(USER_ID_KEY, metadata.user_id)]); Ok(response.into_response()) } Err(e) => { - let model_label: String = metadata.model_name.clone(); - TOTAL_FAILED_IMAGE_GENERATION_REQUESTS - .add(1, &[KeyValue::new("model", model_label.clone())]); - - // Record the failed request in the total failed requests metric - TOTAL_FAILED_REQUESTS.add(1, &[KeyValue::new("model", model_label)]); - UNSUCCESSFUL_IMAGE_GENERATION_REQUESTS_PER_USER - .add(1, &[KeyValue::new("user_id", metadata.user_id)]); + let model: String = metadata.model_name.clone(); + match e.status_code() { + StatusCode::TOO_MANY_REQUESTS => { + TOTAL_TOO_MANY_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + } + StatusCode::BAD_REQUEST => { + TOTAL_BAD_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + } + StatusCode::LOCKED => { + TOTAL_LOCKED_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + } + StatusCode::TOO_EARLY => { + TOTAL_TOO_EARLY_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + } + StatusCode::UNAUTHORIZED => { + TOTAL_UNAUTHORIZED_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + } + _ => { + TOTAL_FAILED_CONFIDENTIAL_IMAGE_GENERATION_REQUESTS + .add(1, &[KeyValue::new(MODEL_KEY, model.clone())]); + TOTAL_FAILED_REQUESTS.add(1, &[KeyValue::new(MODEL_KEY, model)]); + + UNSUCCESSFUL_IMAGE_GENERATION_REQUESTS_PER_USER + .add(1, &[KeyValue::new(USER_ID_KEY, metadata.user_id)]); + } + } match metadata.selected_stack_small_id { Some(stack_small_id) => { update_state_manager( diff --git a/atoma-proxy/src/server/handlers/metrics.rs b/atoma-proxy/src/server/handlers/metrics.rs index 09bfb38a..66014dfc 100644 --- a/atoma-proxy/src/server/handlers/metrics.rs +++ b/atoma-proxy/src/server/handlers/metrics.rs @@ -316,6 +316,112 @@ pub static TOTAL_FAILED_CHAT_REQUESTS: LazyLock> = LazyLock::new(|| .build() }); +/// Counter metric that tracks the total number of too many requests. +/// +/// # Metric Details +/// - Name: `atoma_total_too_many_requests` +/// - Type: Counter +/// - Labels: `model` +/// - Unit: requests (count) +pub static TOTAL_TOO_MANY_REQUESTS: LazyLock> = LazyLock::new(|| { + GLOBAL_METER + .u64_counter("atoma_total_too_many_requests") + .with_description("Total number of too many requests") + .with_unit("requests") + .build() +}); + +/// Counter metric that tracks the total number of bad requests. +/// +/// # Metric Details +/// - Name: `atoma_total_bad_requests` +/// - Type: Counter +/// - Labels: `model` +/// - Unit: requests (count) +pub static TOTAL_BAD_REQUESTS: LazyLock> = LazyLock::new(|| { + GLOBAL_METER + .u64_counter("atoma_total_bad_requests") + .with_description("Total number of bad requests") + .with_unit("requests") + .build() +}); + +/// Counter metric that tracks the total number of locked requests. +/// +/// # Metric Details +/// - Name: `atoma_total_locked_requests` +/// - Type: Counter +/// - Labels: `model` +/// - Unit: requests (count) +pub static TOTAL_LOCKED_REQUESTS: LazyLock> = LazyLock::new(|| { + GLOBAL_METER + .u64_counter("atoma_total_locked_requests") + .with_description("Total number of locked requests") + .with_unit("requests") + .build() +}); + +/// Counter metric that tracks the total number of too early requests. +/// +/// # Metric Details +/// - Name: `atoma_total_too_early_requests` +/// - Type: Counter +/// - Labels: `model` +/// - Unit: requests (count) +pub static TOTAL_TOO_EARLY_REQUESTS: LazyLock> = LazyLock::new(|| { + GLOBAL_METER + .u64_counter("atoma_total_too_early_requests") + .with_description("Total number of too early requests") + .with_unit("requests") + .build() +}); + +/// Counter metric that tracks the total number of unauthorized requests. +/// +/// # Metric Details +/// - Name: `atoma_total_unauthorized_requests` +/// - Type: Counter +/// - Labels: `model` +/// - Unit: requests (count) +pub static TOTAL_UNAUTHORIZED_REQUESTS: LazyLock> = LazyLock::new(|| { + GLOBAL_METER + .u64_counter("atoma_total_unauthorized_requests") + .with_description("Total number of unauthorized requests") + .with_unit("requests") + .build() +}); + +/// Counter metric that tracks the total number of confidential chat requests. +/// +/// # Metric Details +/// - Name: `atoma_total_confidential_chat_requests` +/// - Type: Counter +/// - Labels: `model` +/// - Unit: requests (count) +pub static CHAT_COMPLETIONS_CONFIDENTIAL_NUM_REQUESTS: LazyLock> = + LazyLock::new(|| { + GLOBAL_METER + .u64_counter("atoma_total_confidential_chat_requests") + .with_description("Total number of confidential chat requests") + .with_unit("requests") + .build() + }); + +/// Counter metric that tracks the total number of failed confidential chat requests. +/// +/// # Metric Details +/// - Name: `atoma_total_failed_confidential_chat_requests` +/// - Type: Counter +/// - Labels: `model` +/// - Unit: requests (count) +pub static TOTAL_FAILED_CHAT_CONFIDENTIAL_REQUESTS: LazyLock> = LazyLock::new(|| { + GLOBAL_METER + .u64_counter("atoma_total_failed_confidential_chat_requests") + .with_description("Total number of failed confidential chat requests") + .with_unit("requests") + .build() +}); + /// Counter metric that tracks the total number of failed image generation requests. /// /// # Metric Details @@ -331,6 +437,22 @@ pub static TOTAL_FAILED_IMAGE_GENERATION_REQUESTS: LazyLock> = Lazy .build() }); +/// Counter metric that tracks the total number of failed confidential image generation requests. +/// +/// # Metric Details +/// - Name: `atoma_total_failed_confidential_image_generation_requests` +/// - Type: Counter +/// - Labels: `model` +/// - Unit: requests (count) +pub static TOTAL_FAILED_CONFIDENTIAL_IMAGE_GENERATION_REQUESTS: LazyLock> = + LazyLock::new(|| { + GLOBAL_METER + .u64_counter("atoma_total_failed_confidential_image_generation_requests") + .with_description("Total number of failed confidential image generation requests") + .with_unit("requests") + .build() + }); + /// Counter metric that tracks the total number of failed text embedding requests. /// /// # Metric Details @@ -486,6 +608,22 @@ pub static UNSUCCESSFUL_TEXT_EMBEDDING_REQUESTS_PER_USER: LazyLock> .build() }); +/// Counter metric that tracks the total number of failed text embedding confidential requests. +/// +/// # Metric Details +/// - Name: `atoma_total_failed_text_embedding_confidential_requests` +/// - Type: Counter +/// - Labels: `model` +/// - Unit: requests (count) +pub static TOTAL_FAILED_CONFIDENTIAL_EMBEDDING_REQUESTS: LazyLock> = + LazyLock::new(|| { + GLOBAL_METER + .u64_counter("atoma_total_failed_text_embedding_confidential_requests") + .with_description("Total number of failed text embedding confidential requests") + .with_unit("requests") + .build() + }); + /// Counter metric that tracks the total number of chat completion tokens per user. /// /// # Metric Details diff --git a/atoma-proxy/src/server/middleware.rs b/atoma-proxy/src/server/middleware.rs index 50816c89..a1107d48 100644 --- a/atoma-proxy/src/server/middleware.rs +++ b/atoma-proxy/src/server/middleware.rs @@ -899,6 +899,7 @@ pub mod auth { use atoma_auth::Sui; use atoma_state::types::CheapestNode; use atoma_state::types::Stack; + use atoma_state::AtomaStateManagerError; use atoma_state::{timestamp_to_datetime_or_now, types::AtomaAtomaStateManagerEvent}; use axum::http::HeaderMap; use flume::Sender; @@ -1092,6 +1093,97 @@ pub mod auth { } } + /// Sends an event to the Atoma state manager. + /// + /// This function is used to send events to the state manager for processing. + /// It handles the sending of events and returns a `Result` indicating success or failure. + /// + /// # Arguments + /// * `state_manager_sender` - The sender channel for the Atoma state manager events. + /// * `event` - The event to be sent to the state manager. + /// * `event_name` - A static string representing the name of the event, used for logging. + /// * `endpoint` - The endpoint from which the event is being sent, used for error reporting. + /// + /// # Returns + /// * `Result<()>` - Returns `Ok(())` if the event was sent successfully, or an `AtomaProxyError` if there was an error. + /// + /// # Errors + /// Returns `AtomaProxyError::InternalError` if the event could not be sent, with a message detailing the error. + #[instrument(level = "trace", skip_all)] + pub fn send_event( + state_manager_sender: &Sender, + event: AtomaAtomaStateManagerEvent, + event_name: &'static str, + endpoint: &str, + ) -> Result<()> { + state_manager_sender + .send(event) + .map_err(|err| AtomaProxyError::InternalError { + message: format!("Failed to send {event_name} event: {err:?}"), + client_message: None, + endpoint: endpoint.to_string(), + }) + } + + /// Sends an event to the Atoma state manager and waits for a response. + /// + /// This function is used to send events that require a response from the state manager. + /// It creates a oneshot channel to receive the response and handles any errors that may occur during sending or receiving. + /// + /// # Arguments + /// * `state_manager_sender` - The sender channel for the Atoma state manager events. + /// * `event_creator` - A closure that creates the event to be sent, taking a oneshot sender for the response. + /// * `event_name` - A static string representing the name of the event, used for logging. + /// * `endpoint` - The endpoint from which the event is being sent, used for error reporting. + /// + /// # Returns + /// * `Result` - Returns `Ok(T)` if the event was sent and a response was received successfully, or an `AtomaProxyError` if there was an error. + /// + /// # Errors + /// Returns `AtomaProxyError::InternalError` if: + /// * The event could not be sent to the state manager. + /// * The response could not be received from the oneshot channel. + /// * The response contained an error. + #[instrument(level = "trace", skip_all)] + pub async fn send_event_with_response( + state_manager_sender: &Sender, + event_creator: impl FnOnce( + oneshot::Sender>, + ) -> AtomaAtomaStateManagerEvent, + event_name: &'static str, + endpoint: &str, + ) -> Result { + let (result_sender, result_receiver) = + oneshot::channel::>(); + + state_manager_sender + .send(event_creator(result_sender)) + .map_err(|err| AtomaProxyError::InternalError { + message: format!("Failed to send {event_name} event: {err:?}"), + client_message: None, + endpoint: endpoint.to_string(), + })?; + + result_receiver + .await + .map_err(|err| AtomaProxyError::InternalError { + message: format!("Failed to receive {event_name} result: {err:?}"), + client_message: None, + endpoint: endpoint.to_string(), + })? + .map_err(|err| match err { + AtomaStateManagerError::InsufficientBalance => AtomaProxyError::BalanceError { + message: "Insufficient balance to lock compute units".to_string(), + endpoint: endpoint.to_string(), + }, + _ => AtomaProxyError::InternalError { + message: format!("Failed to get {event_name} result: {err:?}"), + client_message: None, + endpoint: endpoint.to_string(), + }, + }) + } + /// Authenticates a request and attempts to lock compute units for model execution. /// /// This function performs several key operations in sequence: @@ -1184,38 +1276,23 @@ pub mod auth { let node = get_cheapest_node(state, &model, endpoint).await?; // We don't have a stack for the user, lets check if the user is using fiat currency. - let (result_sender, result_receiver) = oneshot::channel(); let fiat_locked_input_amount = num_input_tokens as i64 * node.price_per_one_million_compute_units / ONE_MILLION as i64; let fiat_locked_output_amount = max_output_tokens as i64 * node.price_per_one_million_compute_units / ONE_MILLION as i64; - state - .state_manager_sender - .send(AtomaAtomaStateManagerEvent::LockUserFiatBalance { + let locked_fiat = send_event_with_response( + &state.state_manager_sender, + |result_sender| AtomaAtomaStateManagerEvent::LockUserFiatBalance { user_id, input_amount: fiat_locked_input_amount, output_amount: fiat_locked_output_amount, result_sender, - }) - .map_err(|err| AtomaProxyError::InternalError { - message: format!("Failed to send LockUserFiatBalance event: {err:?}"), - client_message: None, - endpoint: endpoint.to_string(), - })?; - - let locked_fiat = result_receiver - .await - .map_err(|err| AtomaProxyError::InternalError { - message: format!("Failed to receive LockUserFiatBalance result: {err:?}"), - client_message: None, - endpoint: endpoint.to_string(), - })? - .map_err(|err| AtomaProxyError::InternalError { - message: format!("Failed to get LockUserFiatBalance result: {err:?}"), - client_message: None, - endpoint: endpoint.to_string(), - })?; + }, + "LockUserFiatBalance", + endpoint, + ) + .await?; if locked_fiat { return Ok(StackMetadata { @@ -1230,35 +1307,21 @@ pub mod auth { }); } - let (result_sender, result_receiver) = oneshot::channel(); - - state - .state_manager_sender - .send(AtomaAtomaStateManagerEvent::GetStacksForModel { - model: model.to_string(), - free_compute_units: (num_input_tokens + max_output_tokens) as i64, - user_id, - is_confidential: false, // NOTE: This method is only used for non-confidential compute - result_sender, - }) - .map_err(|err| AtomaProxyError::InternalError { - message: format!("Failed to send GetStacksForModel event: {err:?}"), - client_message: None, - endpoint: endpoint.to_string(), - })?; - - let optional_stack = result_receiver - .await - .map_err(|err| AtomaProxyError::InternalError { - message: format!("Failed to receive GetStacksForModel result: {err:?}"), - client_message: None, - endpoint: endpoint.to_string(), - })? - .map_err(|err| AtomaProxyError::InternalError { - message: format!("Failed to get GetStacksForModel result: {err:?}"), - client_message: None, - endpoint: endpoint.to_string(), - })?; + let optional_stack = send_event_with_response( + &state.state_manager_sender, + |result_sender| { + AtomaAtomaStateManagerEvent::GetStacksForModel { + model: model.to_string(), + free_compute_units: (num_input_tokens + max_output_tokens) as i64, + user_id, + is_confidential: false, // NOTE: This method is only used for non-confidential compute + result_sender, + } + }, + "GetStacksForModel", + endpoint, + ) + .await?; Ok(StackMetadata { optional_stack, @@ -1301,34 +1364,19 @@ pub mod auth { endpoint: &str, total_tokens: u64, ) -> Result> { - let (result_sender, result_receiver) = oneshot::channel(); - - state - .state_manager_sender - .send(AtomaAtomaStateManagerEvent::GetStacksForTask { + let optional_stack = send_event_with_response( + &state.state_manager_sender, + |result_sender| AtomaAtomaStateManagerEvent::GetStacksForTask { task_small_id, free_compute_units: total_tokens as i64, user_id, result_sender, - }) - .map_err(|err| AtomaProxyError::InternalError { - message: format!("Failed to send GetStacksForTask event: {err:?}"), - client_message: None, - endpoint: endpoint.to_string(), - })?; + }, + "GetStacksForTask", + endpoint, + ) + .await?; - let optional_stack = result_receiver - .await - .map_err(|err| AtomaProxyError::InternalError { - message: format!("Failed to receive GetStacksForTask result: {err:?}"), - client_message: None, - endpoint: endpoint.to_string(), - })? - .map_err(|err| AtomaProxyError::InternalError { - message: format!("Failed to get GetStacksForTask result: {err:?}"), - client_message: None, - endpoint: endpoint.to_string(), - })?; if let Some(stack) = optional_stack { Ok(Some(SelectedNodeMetadata { stack_small_id: Some(stack.stack_small_id), @@ -1387,35 +1435,20 @@ pub mod auth { endpoint: &str, ) -> Result { // Get node address - let (result_sender, result_receiver) = oneshot::channel(); - state - .state_manager_sender - .send(AtomaAtomaStateManagerEvent::GetNodePublicAddress { + let node_address = send_event_with_response( + &state.state_manager_sender, + |result_sender| AtomaAtomaStateManagerEvent::GetNodePublicAddress { node_small_id: selected_node_id, result_sender, - }) - .map_err(|err| AtomaProxyError::InternalError { - message: format!("Failed to send GetNodePublicAddress event: {err:?}"), - client_message: None, - endpoint: endpoint.to_string(), - })?; - - let node_address = result_receiver - .await - .map_err(|err| AtomaProxyError::InternalError { - message: format!("Failed to receive GetNodePublicAddress result: {err:?}"), - client_message: None, - endpoint: endpoint.to_string(), - })? - .map_err(|err| AtomaProxyError::InternalError { - message: format!("Failed to get GetNodePublicAddress result: {err:?}"), - client_message: None, - endpoint: endpoint.to_string(), - })? - .ok_or_else(|| AtomaProxyError::NotFound { - message: format!("No node address found for node {selected_node_id}"), - endpoint: endpoint.to_string(), - })?; + }, + "GetNodePublicAddress", + endpoint, + ) + .await? + .ok_or_else(|| AtomaProxyError::NotFound { + message: format!("No node address found for node {selected_node_id}"), + endpoint: endpoint.to_string(), + })?; // Get signature let signature = state @@ -1659,32 +1692,19 @@ pub mod auth { let selected_node_id = event.selected_node_id.inner as i64; // Send the NewStackAcquired event to the state manager, so we have it in the DB. - let (result_sender, result_receiver) = oneshot::channel(); - state_manager_sender - .send(AtomaAtomaStateManagerEvent::NewStackAcquired { + send_event_with_response( + &state_manager_sender, + |result_sender| AtomaAtomaStateManagerEvent::NewStackAcquired { event, locked_compute_units: total_tokens as i64, transaction_timestamp: timestamp_to_datetime_or_now(timestamp_ms), user_id, result_sender, - }) - .map_err(|err| AtomaProxyError::InternalError { - message: format!("Failed to send NewStackAcquired event: {err:?}"), - client_message: None, - endpoint: endpoint.to_string(), - })?; - result_receiver - .await - .map_err(|err| AtomaProxyError::InternalError { - message: format!("Failed to receive NewStackAcquired result: {err:?}"), - client_message: None, - endpoint: endpoint.to_string(), - })? - .map_err(|err| AtomaProxyError::InternalError { - message: format!("Failed to receive NewStackAcquired result: {err:?}"), - client_message: None, - endpoint: endpoint.to_string(), - })?; + }, + "NewStackAcquired", + &endpoint, + ) + .await?; Ok(SelectedNodeMetadata { stack_small_id: Some(stack_small_id), selected_node_id, @@ -1725,30 +1745,18 @@ pub mod auth { stack_size_to_buy: u64, endpoint: String, ) -> Result<()> { - let (result_sender, result_receiver) = oneshot::channel(); - state_manager_sender - .send(AtomaAtomaStateManagerEvent::DeductFromUsdc { + send_event_with_response( + &state_manager_sender, + |result_sender| AtomaAtomaStateManagerEvent::DeductFromUsdc { user_id, amount: (price_per_one_million_compute_units * stack_size_to_buy / ONE_MILLION) as i64, result_sender, - }) - .map_err(|err| AtomaProxyError::InternalError { - message: format!("Failed to send DeductFromUsdc event: {err:?}"), - client_message: None, - endpoint: endpoint.clone(), - })?; - result_receiver - .await - .map_err(|err| AtomaProxyError::InternalError { - message: format!("Failed to receive DeductFromUsdc result: {err:?}"), - client_message: None, - endpoint: endpoint.clone(), - })? - .map_err(|err| AtomaProxyError::BalanceError { - message: format!("Balance error : {err:?}"), - endpoint, - })?; + }, + "DeductFromUsdc", + &endpoint, + ) + .await?; Ok(()) } @@ -1782,31 +1790,18 @@ pub mod auth { stack_size_to_buy: u64, endpoint: String, ) -> Result<()> { - let (result_sender, result_receiver) = oneshot::channel(); - state_manager_sender - .send(AtomaAtomaStateManagerEvent::RefundUsdc { + send_event_with_response( + &state_manager_sender, + |result_sender| AtomaAtomaStateManagerEvent::RefundUsdc { user_id, amount: (price_per_one_million_compute_units * stack_size_to_buy / ONE_MILLION) as i64, result_sender, - }) - .map_err(|err| AtomaProxyError::InternalError { - message: format!("Failed to send RefundUsdc event: {err:?}"), - client_message: None, - endpoint: endpoint.to_string(), - })?; - result_receiver - .await - .map_err(|err| AtomaProxyError::InternalError { - message: format!("Failed to receive RefundUsdc result: {err:?}"), - client_message: None, - endpoint: endpoint.to_string(), - })? - .map_err(|err| AtomaProxyError::InternalError { - message: format!("Failed to refund USDC: {err:?}"), - client_message: None, - endpoint: endpoint.to_string(), - }) + }, + "RefundUsdc", + &endpoint, + ) + .await } /// Retrieves stack metadata for a locked user stack based on the endpoint type @@ -2058,31 +2053,21 @@ pub mod auth { model: &str, endpoint: &str, ) -> Result { - let (result_sender, result_receiver) = oneshot::channel(); - state - .state_manager_sender - .send(AtomaAtomaStateManagerEvent::GetCheapestNodeForModel { - model: model.to_string(), - is_confidential: false, - result_sender, - }) - .map_err(|err| AtomaProxyError::InternalError { - message: format!("Failed to send GetTasksForModel event: {err:?}"), - client_message: None, - endpoint: endpoint.to_string(), - })?; - let node = result_receiver - .await - .map_err(|err| AtomaProxyError::InternalError { - message: format!("Failed to receive GetTasksForModel result: {err:?}"), - client_message: None, - endpoint: endpoint.to_string(), - })? - .map_err(|err| AtomaProxyError::InternalError { - message: format!("Failed to get retrieve `CheapestNode` from the state manager with result: {err:?}"), - client_message: None, - endpoint: endpoint.to_string(), - })?; + let node = send_event_with_response( + &state.state_manager_sender, + |result_sender| { + // Send the GetCheapestNodeForModel event to the state manager + AtomaAtomaStateManagerEvent::GetCheapestNodeForModel { + model: model.to_string(), + is_confidential: false, + result_sender, + } + }, + "GetCheapestNodeForModel", + endpoint, + ) + .await?; + node.map_or_else( || { Err(AtomaProxyError::RequestError { @@ -2205,33 +2190,19 @@ pub mod auth { is_confidential: bool, endpoint: &str, ) -> Result> { - let (result_sender, result_receiver) = oneshot::channel(); - state - .state_manager_sender - .send(AtomaAtomaStateManagerEvent::GetStacksForModel { + let maybe_stack = send_event_with_response( + &state.state_manager_sender, + |result_sender| AtomaAtomaStateManagerEvent::GetStacksForModel { model: model.to_string(), user_id, free_compute_units, is_confidential, result_sender, - }) - .map_err(|err| AtomaProxyError::InternalError { - message: format!("Failed to send GetStacksForModel event: {err:?}"), - client_message: None, - endpoint: endpoint.to_string(), - })?; - let maybe_stack = result_receiver - .await - .map_err(|err| AtomaProxyError::InternalError { - message: format!("Failed to receive GetStacksForModel result: {err:?}"), - client_message: None, - endpoint: endpoint.to_string(), - })? - .map_err(|err| AtomaProxyError::InternalError { - message: format!("Failed to get GetStacksForModel result: {err:?}"), - client_message: None, - endpoint: endpoint.to_string(), - })?; + }, + "GetStacksForModel", + endpoint, + ) + .await?; Ok(maybe_stack.map(|stack| SelectedNodeMetadata { selected_node_id: stack.selected_node_id, stack_small_id: Some(stack.stack_small_id), @@ -2255,13 +2226,15 @@ pub mod utils { image_generations::CONFIDENTIAL_IMAGE_GENERATIONS_PATH, update_state_manager, }, http_server::{LockedComputeUnits, StackSmallId}, + middleware::auth::send_event, MODEL, }; use super::{ - auth, constants, instrument, AtomaAtomaStateManagerEvent, AtomaProxyError, Body, - HeaderValue, Parts, ProcessedRequest, ProxyState, Request, RequestMetadataExtension, - Result, State, Value, CONTENT_LENGTH, + auth::{self, send_event_with_response}, + constants, instrument, AtomaAtomaStateManagerEvent, AtomaProxyError, Body, HeaderValue, + Parts, ProcessedRequest, ProxyState, Request, RequestMetadataExtension, Result, State, + Value, CONTENT_LENGTH, }; /// Validates and prepares a request for processing by a specific stack and node. @@ -2742,29 +2715,19 @@ pub mod utils { stack_small_id: i64, endpoint: &str, ) -> Result<(String, i64)> { - let (result_sender, result_receiver) = tokio::sync::oneshot::channel(); - state - .state_manager_sender - .send(AtomaAtomaStateManagerEvent::GetNodePublicUrlAndSmallId { - stack_small_id, - result_sender, - }) - .map_err(|e| AtomaProxyError::InternalError { - message: format!("Failed to send GetNodePublicAddress event: {e:?}"), - client_message: None, - endpoint: endpoint.to_string(), - })?; - let (node_address, node_small_id) = result_receiver - .await - .map_err(|e| AtomaProxyError::InternalError { - message: format!("Failed to receive GetNodePublicAddress result: {e:?}"), - client_message: None, - endpoint: endpoint.to_string(), - })? - .map_err(|e| AtomaProxyError::NotFound { - message: format!("Failed to get node public address: {e:?}"), - endpoint: endpoint.to_string(), - })?; + let (node_address, node_small_id) = send_event_with_response( + &state.state_manager_sender, + |result_sender| { + // Send the GetNodePublicUrlAndSmallId event to the state manager + AtomaAtomaStateManagerEvent::GetNodePublicUrlAndSmallId { + stack_small_id, + result_sender, + } + }, + "GetNodePublicUrlAndSmallId", + endpoint, + ) + .await?; if let Some(node_address) = node_address { return Ok((node_address, node_small_id)); } @@ -2843,13 +2806,13 @@ pub mod utils { stack_small_id = %stack_small_id, "Stack is in locked state for the requested node, trying to acquire a new stack" ); - state_manager_sender - .send(AtomaAtomaStateManagerEvent::LockStack { stack_small_id }) - .map_err(|e| AtomaProxyError::InternalError { - message: format!("Failed to send LockStack event: {e}"), - client_message: None, - endpoint: endpoint.to_string(), - })?; + send_event( + state_manager_sender, + AtomaAtomaStateManagerEvent::LockStack { stack_small_id }, + "LockStack", + endpoint, + )?; + Ok(()) } diff --git a/helm/atoma-proxy/Chart.lock b/helm/atoma-proxy/Chart.lock new file mode 100644 index 00000000..acf72f30 --- /dev/null +++ b/helm/atoma-proxy/Chart.lock @@ -0,0 +1,18 @@ +dependencies: +- name: postgresql + repository: https://charts.bitnami.com/bitnami + version: 12.12.10 +- name: prometheus + repository: https://prometheus-community.github.io/helm-charts + version: 15.18.0 +- name: grafana + repository: https://grafana.github.io/helm-charts + version: 6.61.2 +- name: loki + repository: https://grafana.github.io/helm-charts + version: 5.48.0 +- name: tempo + repository: https://grafana.github.io/helm-charts + version: 1.21.1 +digest: sha256:7e74b8e8a4fcc6eeff7275adb824a0f6bb0dc434351595501ecdad2cba9b8f24 +generated: "2025-05-14T11:34:17.060119-05:00" diff --git a/helm/atoma-proxy/Chart.yaml b/helm/atoma-proxy/Chart.yaml new file mode 100644 index 00000000..14498b75 --- /dev/null +++ b/helm/atoma-proxy/Chart.yaml @@ -0,0 +1,29 @@ +apiVersion: v2 +name: atoma-proxy +description: A Helm chart for Atoma Proxy +type: application +version: 0.1.0 +appVersion: "1.0.0" +maintainers: + - name: Atoma Network +dependencies: + - name: postgresql + version: "12.x.x" + repository: https://charts.bitnami.com/bitnami + condition: postgresql.enabled + - name: prometheus + version: "15.x.x" + repository: https://prometheus-community.github.io/helm-charts + condition: prometheus.enabled + - name: grafana + version: "6.x.x" + repository: https://grafana.github.io/helm-charts + condition: grafana.enabled + - name: loki + version: "5.x.x" + repository: https://grafana.github.io/helm-charts + condition: loki.enabled + - name: tempo + version: "1.x.x" + repository: https://grafana.github.io/helm-charts + condition: tempo.enabled diff --git a/helm/atoma-proxy/README.md b/helm/atoma-proxy/README.md new file mode 100644 index 00000000..1d435dc6 --- /dev/null +++ b/helm/atoma-proxy/README.md @@ -0,0 +1,417 @@ +# Atoma Proxy Helm Chart + +This Helm chart deploys the Atoma Proxy application along with its dependencies, including PostgreSQL, Prometheus, Grafana, Loki, and Tempo. + +## Prerequisites + +- Kubernetes cluster (v1.19+) +- Helm 3.x +- cert-manager (for SSL certificates) +- nginx-ingress-controller +- kubectl configured to communicate with your cluster + +## Installation + +### Step-by-Step Installation + +1. **Start Minikube** (if using Minikube): + ```bash + minikube start -p atoma-proxy \ + --driver=docker \ + --cpus=4 \ + --memory=8g \ + --disk-size=50g \ + --force \ + --addons=ingress,metrics-server + ``` + +2. **Create the namespace**: + ```bash + kubectl create namespace atoma-proxy + ``` + +3. **Add required Helm repositories**: + ```bash + helm repo add bitnami https://charts.bitnami.com/bitnami + helm repo add prometheus-community https://prometheus-community.github.io/helm-charts + helm repo add grafana https://grafana.github.io/helm-charts + helm repo update + ``` + +4. **Navigate to the Helm chart directory and build dependencies**: + ```bash + cd helm/atoma-proxy + helm dependency build + ``` + +5. **Install the chart with development values**: + ```bash + helm install atoma-proxy . -f values-dev.yaml -n atoma-proxy + ``` + +6. **Start Minikube tunnel** (in a separate terminal) for ingress access: + ```bash + minikube tunnel -p atoma-proxy + ``` + +7. **Verify the installation**: + ```bash + # Check all pods are running + kubectl get pods -n atoma-proxy + + # Check ingress resources + kubectl get ingress -n atoma-proxy + + # Check services + kubectl get svc -n atoma-proxy + ``` + +### Quick Installation + +Alternatively, you can use the deployment script:```bash +./scripts/deploy.sh -e dev -p your-password +``` + +## Configuration + +The chart can be configured using values files or command-line arguments. The main configuration files are: + +- `values.yaml`: Default configuration +- `values-dev.yaml`: Development environment configuration +- `values-prod.yaml`: Production environment configuration + +### Key Configuration Parameters + +#### Main Application +```yaml +atomaProxy: + image: + repository: ghcr.io/atoma-network/atoma-proxy + tag: latest + replicas: 1 + resources: + requests: + memory: "512Mi" + cpu: "250m" + limits: + memory: "1Gi" + cpu: "500m" +``` + +#### PostgreSQL +```yaml +postgresql: + enabled: true + auth: + database: atoma_proxy + username: atoma_proxy + password: "" # Set via --set or secrets management + primary: + persistence: + size: 10Gi +``` + +#### Monitoring Stack +```yaml +prometheus: + enabled: true + server: + persistentVolume: + size: 10Gi + +grafana: + enabled: true + persistence: + size: 10Gi + admin: + password: "" # Set via --set or secrets management + +loki: + enabled: true + persistence: + size: 10Gi + +tempo: + enabled: true + persistence: + size: 10Gi +``` + +## Deployment Script + +The `scripts/deploy.sh` script provides an easy way to deploy the application: + +```bash +./scripts/deploy.sh [options] + +Options: + -e, --env ENV Environment to deploy (dev/prod) [default: dev] + -n, --namespace NS Kubernetes namespace [default: atoma-proxy-{env}] + -r, --release NAME Helm release name [default: atoma-proxy-{env}] + -p, --password PASS Password for PostgreSQL and Grafana + -h, --help Show this help message +``` + +### Deployment configuration + +You need to ensure there is a `files` folder on the server you which to deploy to, located at `atoma-proxy/helm/atoma-proxy/files` which contains the `config.toml` and `sui_config` folder, as this is what the [sui-config-configmap.yaml](./templates/sui-config-configmap.yaml) and + +``` +helm/atoma-proxy/ +├── files/ +│ ├── config.toml # Your main config file +│ └── sui_config/ # Your Sui config files +│ ├── client.yaml +│ ├── sui.keystore +│ └── sui.aliases +``` + +## Environment-Specific Configurations + +### Development +- Uses staging SSL certificates +- Lower resource limits +- Debug logging +- Single replica +- Smaller storage volumes +- Dev-specific hostnames + +### Production +- Uses production SSL certificates +- Higher resource limits +- Info logging +- Multiple replicas +- Larger storage volumes +- Production hostnames +- Secure password management + +## Monitoring and Logging + +The chart includes a complete monitoring stack: + +- **Prometheus**: Metrics collection +- **Grafana**: Metrics visualization +- **Loki**: Log aggregation +- **Tempo**: Distributed tracing + +Access the monitoring tools at: +- Grafana: `https://grafana.{domain}` +- Prometheus: `https://prometheus.{domain}` +- Loki: `https://loki.{domain}` +- Tempo: `https://tempo.{domain}` + +## Security Considerations + +1. **Secret Management**: Always use Helm for managing secrets instead of direct kubectl commands. This ensures consistent deployment and better security practices: + + ```bash + # Update secrets using Helm + helm upgrade atoma-proxy-dev . \ + -n atoma-proxy-dev \ + -f values-dev.yaml \ + --set postgresql.auth.password=your-secure-password \ + --set grafana.admin.password=your-secure-password \ + --set openRouter.apiKey=your-api-key + + # For production environments + helm upgrade atoma-proxy-prod . \ + -n atoma-proxy-prod \ + -f values-prod.yaml \ + --set postgresql.auth.password=your-secure-password \ + --set grafana.admin.password=your-secure-password \ + --set openRouter.apiKey=your-api-key + ``` + + > **Important**: Never store sensitive values in values files. Always use `--set` or `--set-file` flags with Helm commands to manage secrets. + +2. **SSL Certificates**: The chart uses cert-manager for SSL certificate management: + - Development: `letsencrypt-staging` + - Production: `letsencrypt-prod` + +3. **Resource Limits**: Adjust resource limits based on your cluster capacity and application needs. + +## Troubleshooting + +1. **Pod Issues**: + ```bash + kubectl get pods -n atoma-proxy-{env} + kubectl describe pod -n atoma-proxy-{env} + kubectl logs -n atoma-proxy-{env} + ``` + +2. **Ingress Issues**: + ```bash + kubectl get ingress -n atoma-proxy-{env} + kubectl describe ingress -n atoma-proxy-{env} + ``` + +3. **Database Issues**: + ```bash + kubectl get pods -n atoma-proxy-{env} -l app.kubernetes.io/name=postgresql + kubectl logs -n atoma-proxy-{env} + ``` + +## Maintenance + +1. **Updating the Chart and Secrets**: + ```bash + # Update chart and secrets for development + helm upgrade atoma-proxy-dev . \ + -n atoma-proxy-dev \ + -f values-dev.yaml \ + --set postgresql.auth.password=your-secure-password \ + --set grafana.admin.password=your-secure-password + + # Update chart and secrets for production + helm upgrade atoma-proxy-prod . \ + -n atoma-proxy-prod \ + -f values-prod.yaml \ + --set postgresql.auth.password=your-secure-password \ + --set grafana.admin.password=your-secure-password + ``` + +2. **Backup**: + - PostgreSQL data is stored in persistent volumes + - Regular backups should be configured for production + - Use Helm to manage backup configurations: + ```bash + helm upgrade atoma-proxy-prod . \ + -n atoma-proxy-prod \ + -f values-prod.yaml \ + --set postgresql.backup.enabled=true \ + --set postgresql.backup.schedule="0 0 * * *" + ``` + +3. **Scaling**: + - Use Helm to adjust replicas and resources: + ```bash + helm upgrade atoma-proxy-prod . \ + -n atoma-proxy-prod \ + -f values-prod.yaml \ + --set atomaProxy.replicas=3 \ + --set atomaProxy.resources.limits.memory=2Gi + ``` + +## Contributing + +1. Fork the repository +2. Create a feature branch +3. Make your changes +4. Submit a pull request + +## License + +This chart is licensed under the same license as the Atoma Proxy application. + +## Quickstart Checklist (including Apple Silicon/M1/M2/M3) + +1. **Build and Push a Multi-Arch Image (if on Apple Silicon):** + ```bash + docker buildx build --platform linux/amd64,linux/arm64 -t ghcr.io/atoma-network/atoma-proxy:latest --push . + ``` + > If you are on an M1/M2/M3 Mac, you must use a multi-arch image or an arm64 image. Otherwise, your pod will fail with `rosetta error: failed to open elf at /lib64/ld-linux-x86-64.so.2`. + +2. **Start Minikube:** + ```bash + minikube start -p atoma-proxy \ + --driver=docker \ + --cpus=4 \ + --memory=8g \ + --disk-size=50g \ + --force \ + --addons=ingress,metrics-server + ``` + +3. **Create the namespace:** + ```bash + kubectl create namespace atoma-proxy + ``` + +4. **Add required Helm repositories:** + ```bash + helm repo add bitnami https://charts.bitnami.com/bitnami + helm repo add prometheus-community https://prometheus-community.github.io/helm-charts + helm repo add grafana https://grafana.github.io/helm-charts + helm repo update + ``` + +5. **Build Helm dependencies:** + ```bash + cd helm/atoma-proxy + helm dependency build + ``` + +6. **Install the chart with development values:** + ```bash + helm install atoma-proxy . -f values-dev.yaml -n atoma-proxy + ``` + +7. **Start Minikube tunnel (for ingress):** + ```bash + minikube tunnel -p atoma-proxy + ``` + +8. **Verify all pods are running:** + ```bash + kubectl get pods -n atoma-proxy + ``` + +9. **If using OTEL Collector, ensure the deployment and service are present:** + ```bash + kubectl get deployment,svc -n atoma-proxy | grep otel-collector + ``` + +10. **If Atoma Proxy pod is not running, check logs and describe:** + ```bash + kubectl logs -n atoma-proxy -l app.kubernetes.io/name=atoma-proxy --tail=50 + kubectl describe pod -n atoma-proxy -l app.kubernetes.io/name=atoma-proxy + ``` + +11. **If you see `ImagePullBackOff`, ensure your image tag is correct and public, or use an image pull secret for private images.** + +12. **If you see `rosetta error: failed to open elf at /lib64/ld-linux-x86-64.so.2`, you need a multi-arch image. See step 1.** + +13. **Check Prometheus targets:** + ```bash + kubectl port-forward svc/atoma-proxy-prometheus-server 9090:80 -n atoma-proxy + # Then open http://localhost:9090/targets in your browser + ``` + +14. **Check for Atoma metrics in Prometheus/Grafana:** + - In Prometheus or Grafana, search for metrics with the prefix `atoma_`. + +--- + +## Debugging & Verification + +- **Check all pods in the namespace:** + ```bash + kubectl get pods -n atoma-proxy + ``` +- **Check logs for a specific pod:** + ```bash + kubectl logs -n atoma-proxy + ``` +- **Describe a pod for events and status:** + ```bash + kubectl describe pod -n atoma-proxy + ``` +- **Check OTEL Collector deployment and service:** + ```bash + kubectl get deployment,svc -n atoma-proxy | grep otel-collector + ``` +- **Check if Atoma Proxy pod is running:** + ```bash + kubectl get pods -n atoma-proxy -l app.kubernetes.io/name=atoma-proxy + ``` +- **Check for image pull errors:** + ```bash + kubectl describe pod -n atoma-proxy -l app.kubernetes.io/name=atoma-proxy + ``` +- **Check Prometheus targets:** + ```bash + kubectl port-forward svc/atoma-proxy-prometheus-server 9090:80 -n atoma-proxy + # Open http://localhost:9090/targets + ``` +- **Check for Atoma metrics in Prometheus/Grafana:** + - In Prometheus or Grafana, search for metrics with the prefix `atoma_`. diff --git a/helm/atoma-proxy/scripts/deploy.sh b/helm/atoma-proxy/scripts/deploy.sh new file mode 100755 index 00000000..56e1594b --- /dev/null +++ b/helm/atoma-proxy/scripts/deploy.sh @@ -0,0 +1,186 @@ +#!/bin/bash + +set -e + +# Default values +ENV="dev" +NAMESPACE="" +VALUES_FILE="" +RELEASE_NAME="" +PASSWORD="" + +# Check for required tools +check_prerequisites() { + local missing_tools=() + + # Check for kubectl + if ! command -v kubectl &> /dev/null; then + missing_tools+=("kubectl") + fi + + # Check for helm + if ! command -v helm &> /dev/null; then + missing_tools+=("helm") + fi + + # If any tools are missing, show error and exit + if [ ${#missing_tools[@]} -ne 0 ]; then + echo "Error: The following required tools are not installed:" + for tool in "${missing_tools[@]}"; do + echo " - $tool" + done + echo "" + echo "Please install the missing tools and try again." + echo "You can install them using:" + echo " - kubectl: https://kubernetes.io/docs/tasks/tools/install-kubectl/" + echo " - helm: https://helm.sh/docs/intro/install/" + exit 1 + fi + + # Check kubectl configuration + echo "Checking kubectl configuration..." + + # Check if kubeconfig exists + if [ -z "$KUBECONFIG" ]; then + if [ ! -f "$HOME/.kube/config" ]; then + echo "Error: No kubeconfig file found." + echo "Please ensure you have a valid kubeconfig file at ~/.kube/config" + echo "You can get this file from your cluster administrator or cloud provider." + exit 1 + fi + fi + + # Check current context + if ! kubectl config current-context &> /dev/null; then + echo "Error: No current context set in kubectl configuration." + echo "Available contexts:" + kubectl config get-contexts + echo "" + echo "Please set a context using:" + echo " kubectl config use-context " + exit 1 + fi + + # Check cluster connection + if ! kubectl cluster-info &> /dev/null; then + echo "Error: Cannot connect to the Kubernetes cluster." + echo "Current context: $(kubectl config current-context)" + echo "" + echo "Please check:" + echo " 1. Your cluster is running" + echo " 2. Your kubeconfig file is correct" + echo " 3. You have network access to the cluster" + echo " 4. Your credentials are valid" + echo "" + echo "You can verify your configuration with:" + echo " kubectl config view" + echo " kubectl cluster-info" + exit 1 + fi + + echo "✓ kubectl is properly configured" +} + +# Help message +show_help() { + echo "Usage: $0 [options]" + echo "Options:" + echo " -e, --env ENV Environment to deploy (dev/prod) [default: dev]" + echo " -n, --namespace NS Kubernetes namespace [default: atoma-proxy-{env}]" + echo " -r, --release NAME Helm release name [default: atoma-proxy-{env}]" + echo " -p, --password PASS Password for PostgreSQL and Grafana" + echo " -h, --help Show this help message" + exit 1 +} + +# Parse command line arguments +while [[ $# -gt 0 ]]; do + case $1 in + -e|--env) + ENV="$2" + shift 2 + ;; + -n|--namespace) + NAMESPACE="$2" + shift 2 + ;; + -r|--release) + RELEASE_NAME="$2" + shift 2 + ;; + -p|--password) + PASSWORD="$2" + shift 2 + ;; + -h|--help) + show_help + ;; + *) + echo "Unknown option: $1" + show_help + ;; + esac +done + +# Check prerequisites +echo "Checking prerequisites..." +check_prerequisites + +# Validate environment +if [[ "$ENV" != "dev" && "$ENV" != "prod" ]]; then + echo "Error: Environment must be either 'dev' or 'prod'" + exit 1 +fi + +# Set default values if not provided +if [ -z "$NAMESPACE" ]; then + NAMESPACE="atoma-proxy-$ENV" +fi + +if [ -z "$RELEASE_NAME" ]; then + RELEASE_NAME="atoma-proxy-$ENV" +fi + +# Get the directory where the script is located +SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )" +CHART_DIR="$(dirname "$SCRIPT_DIR")" + +# Set values file based on environment +VALUES_FILE="$CHART_DIR/values-$ENV.yaml" + +# Check if values file exists +if [ ! -f "$VALUES_FILE" ]; then + echo "Error: Values file $VALUES_FILE not found" + exit 1 +fi + +# Add Helm repositories if not already added +echo "Adding Helm repositories..." +helm repo add bitnami https://charts.bitnami.com/bitnami +helm repo add prometheus-community https://prometheus-community.github.io/helm-charts +helm repo add grafana https://grafana.github.io/helm-charts +helm repo update + +# Update dependencies +echo "Updating Helm dependencies..." +cd "$CHART_DIR" +helm dependency update + +# Prepare password arguments +PASSWORD_ARGS="" +if [ ! -z "$PASSWORD" ]; then + PASSWORD_ARGS="--set postgresql.auth.password=$PASSWORD --set grafana.admin.password=$PASSWORD" +fi + +# Deploy the chart +echo "Deploying $RELEASE_NAME to namespace $NAMESPACE..." +helm upgrade --install $RELEASE_NAME . \ + --namespace $NAMESPACE \ + --create-namespace \ + -f "$VALUES_FILE" \ + $PASSWORD_ARGS + +echo "Deployment completed successfully!" +echo "Release: $RELEASE_NAME" +echo "Namespace: $NAMESPACE" +echo "Environment: $ENV" \ No newline at end of file diff --git a/helm/atoma-proxy/scripts/fix-addons.sh b/helm/atoma-proxy/scripts/fix-addons.sh new file mode 100755 index 00000000..01306a54 --- /dev/null +++ b/helm/atoma-proxy/scripts/fix-addons.sh @@ -0,0 +1,53 @@ +#!/bin/bash + +set -e + +# Function to wait for API server +wait_for_api() { + echo "Waiting for API server to be ready..." + until kubectl get nodes &>/dev/null; do + echo "Waiting for API server..." + sleep 5 + done +} + +# Function to apply manifest with retries +apply_with_retry() { + local manifest=$1 + local max_attempts=3 + local attempt=1 + + while [ $attempt -le $max_attempts ]; do + echo "Attempt $attempt of $max_attempts to apply manifest..." + if kubectl apply -f "$manifest" --validate=false; then + return 0 + fi + echo "Attempt $attempt failed, waiting before retry..." + sleep 10 + attempt=$((attempt + 1)) + done + echo "Failed to apply manifest after $max_attempts attempts" + return 1 +} + +# Wait for API server +wait_for_api + +# Enable metrics server +echo "Enabling metrics server..." +apply_with_retry "https://github.com/kubernetes-sigs/metrics-server/releases/latest/download/components.yaml" + +# Enable default storage class +echo "Enabling default storage class..." +apply_with_retry "https://raw.githubusercontent.com/kubernetes/minikube/master/deploy/addons/storageclass/storageclass.yaml" + +# Enable storage provisioner +echo "Enabling storage provisioner..." +apply_with_retry "https://raw.githubusercontent.com/kubernetes/minikube/master/deploy/addons/storage-provisioner/storage-provisioner.yaml" + +# Wait for metrics server to be ready +echo "Waiting for metrics server to be ready..." +kubectl wait --for=condition=ready pod -l k8s-app=metrics-server -n kube-system --timeout=300s || true + +echo "Addons have been installed. Please verify their status with:" +echo "kubectl get pods -n kube-system" \ No newline at end of file diff --git a/helm/atoma-proxy/scripts/fix-rbac.sh b/helm/atoma-proxy/scripts/fix-rbac.sh new file mode 100755 index 00000000..4f2a1eb2 --- /dev/null +++ b/helm/atoma-proxy/scripts/fix-rbac.sh @@ -0,0 +1,50 @@ +#!/bin/bash + +set -e + +# Wait for API server to be ready +echo "Waiting for API server to be ready..." +until kubectl get nodes &>/dev/null; do + echo "Waiting for API server..." + sleep 5 +done + +# Create ClusterRole for namespace controller +echo "Creating ClusterRole..." +kubectl apply -f - --validate=false < /dev/null; then + missing_tools+=("kubectl") + fi + + # Check for helm + if ! command -v helm &> /dev/null; then + missing_tools+=("helm") + fi + + # Check for cluster-specific tools + case $CLUSTER_TYPE in + minikube) + if ! command -v minikube &> /dev/null; then + missing_tools+=("minikube") + fi + ;; + kind) + if ! command -v kind &> /dev/null; then + missing_tools+=("kind") + fi + ;; + eks) + if ! command -v aws &> /dev/null; then + missing_tools+=("aws-cli") + fi + if ! command -v eksctl &> /dev/null; then + missing_tools+=("eksctl") + fi + ;; + gke) + if ! command -v gcloud &> /dev/null; then + missing_tools+=("gcloud") + fi + ;; + aks) + if ! command -v az &> /dev/null; then + missing_tools+=("azure-cli") + fi + ;; + esac + + # If any tools are missing, show error and exit + if [ ${#missing_tools[@]} -ne 0 ]; then + echo "Error: The following required tools are not installed:" + for tool in "${missing_tools[@]}"; do + echo " - $tool" + done + echo "" + echo "Please install the missing tools and try again." + echo "You can install them using:" + echo " - kubectl: https://kubernetes.io/docs/tasks/tools/install-kubectl/" + echo " - helm: https://helm.sh/docs/intro/install/" + echo " - minikube: https://minikube.sigs.k8s.io/docs/start/" + echo " - kind: https://kind.sigs.k8s.io/docs/user/quick-start/#installation" + echo " - aws-cli: https://aws.amazon.com/cli/" + echo " - eksctl: https://eksctl.io/introduction/installation/" + echo " - gcloud: https://cloud.google.com/sdk/docs/install" + echo " - azure-cli: https://docs.microsoft.com/en-us/cli/azure/install-azure-cli" + exit 1 + fi +} + +# Setup local development cluster +setup_local_cluster() { + case $CLUSTER_TYPE in + minikube) + echo "Setting up Minikube cluster..." + # Start minikube with profile name instead of --name flag + minikube start -p $CLUSTER_NAME --driver=docker --cpus=4 --memory=8g --disk-size=50g + + # Wait for minikube to be ready + echo "Waiting for minikube to be ready..." + minikube status -p $CLUSTER_NAME + + # Enable addons + echo "Enabling minikube addons..." + minikube addons enable ingress -p $CLUSTER_NAME + minikube addons enable metrics-server -p $CLUSTER_NAME + + # Update kubeconfig + echo "Updating kubeconfig..." + minikube update-context -p $CLUSTER_NAME + + # Set up kubectl to use minikube + echo "Setting up kubectl configuration..." + export KUBECONFIG=~/.kube/config + minikube kubectl -- config use-context $CLUSTER_NAME + + # Verify the context + if ! kubectl config current-context | grep -q "$CLUSTER_NAME"; then + echo "Error: Failed to set minikube context" + exit 1 + fi + + # Test kubectl connection + echo "Testing kubectl connection..." + kubectl cluster-info + ;; + kind) + echo "Setting up Kind cluster..." + cat < /dev/null; do + echo "Still waiting for ingress-nginx namespace to be deleted..." + sleep 5 + done + + # Additional cleanup of any remaining resources + echo "Cleaning up any remaining ingress-nginx resources..." + kubectl delete clusterrole ingress-nginx --ignore-not-found + kubectl delete clusterrolebinding ingress-nginx --ignore-not-found + kubectl delete serviceaccount ingress-nginx -n ingress-nginx --ignore-not-found + kubectl delete service ingress-nginx-controller -n ingress-nginx --ignore-not-found + kubectl delete service ingress-nginx-controller-admission -n ingress-nginx --ignore-not-found + kubectl delete deployment ingress-nginx-controller -n ingress-nginx --ignore-not-found + kubectl delete validatingwebhookconfiguration ingress-nginx-admission --ignore-not-found + kubectl delete ingressclass nginx --ignore-not-found + kubectl delete ingressclass nginx-ingress --ignore-not-found + kubectl delete ingressclass ingress-nginx --ignore-not-found + + # Install nginx-ingress + echo "Installing nginx-ingress..." + helm repo add ingress-nginx https://kubernetes.github.io/ingress-nginx + helm repo update + helm install ingress-nginx ingress-nginx/ingress-nginx \ + --namespace ingress-nginx \ + --create-namespace \ + --set controller.service.type=LoadBalancer \ + --set controller.service.externalTrafficPolicy=Local \ + --set controller.service.enableHttps=false \ + --set controller.ingressClassResource.name=nginx \ + --set controller.ingressClassResource.enabled=true \ + --set controller.ingressClassResource.default=true + + # Install metrics-server if not using minikube + if [ "$CLUSTER_TYPE" != "minikube" ]; then + echo "Installing metrics-server..." + kubectl apply -f https://github.com/kubernetes-sigs/metrics-server/releases/latest/download/components.yaml + fi +} + +# Main execution +echo "Starting cluster setup..." + +# Check prerequisites +check_prerequisites + +# Setup cluster +if [[ "$CLUSTER_TYPE" == "minikube" || "$CLUSTER_TYPE" == "kind" ]]; then + setup_local_cluster +else + setup_cloud_cluster +fi + +# Install components +install_components + +echo "Cluster setup completed successfully!" +echo "Cluster type: $CLUSTER_TYPE" +echo "Cluster name: $CLUSTER_NAME" +echo "" +echo "You can now deploy the application using:" +echo "./deploy.sh -e dev -p your-password" \ No newline at end of file diff --git a/helm/atoma-proxy/templates/configmap.yaml b/helm/atoma-proxy/templates/configmap.yaml new file mode 100644 index 00000000..d62601c2 --- /dev/null +++ b/helm/atoma-proxy/templates/configmap.yaml @@ -0,0 +1,14 @@ +# helm/atoma-proxy/templates/configmap.yaml +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ .Release.Name }}-config + namespace: {{ .Release.Namespace }} + labels: + app.kubernetes.io/name: {{ .Release.Name }} + app.kubernetes.io/instance: {{ .Release.Name }} +data: + config.toml: |- +{{ .Files.Get "files/config.toml" | indent 4 }} + environment: {{ .Values.atomaProxy.config.environment | quote }} + log_level: {{ .Values.atomaProxy.config.logLevel | quote }} \ No newline at end of file diff --git a/helm/atoma-proxy/templates/deployment.yaml b/helm/atoma-proxy/templates/deployment.yaml new file mode 100644 index 00000000..ffd43119 --- /dev/null +++ b/helm/atoma-proxy/templates/deployment.yaml @@ -0,0 +1,96 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ .Release.Name }} + namespace: {{ .Release.Namespace }} + labels: + app.kubernetes.io/name: {{ .Release.Name }} + app.kubernetes.io/instance: {{ .Release.Name }} +spec: + replicas: {{ .Values.atomaProxy.replicas }} + selector: + matchLabels: + app.kubernetes.io/name: {{ .Release.Name }} + app.kubernetes.io/instance: {{ .Release.Name }} + template: + metadata: + labels: + app.kubernetes.io/name: {{ .Release.Name }} + app.kubernetes.io/instance: {{ .Release.Name }} + spec: + initContainers: + - name: init-sui-config + image: busybox + command: + - sh + - -c + - | + mkdir -p /root/.sui/sui_config + cp /tmp-sui-config/* /root/.sui/sui_config/ + chmod -R 777 /root/.sui/sui_config + volumeMounts: + - name: tmp-sui-config + mountPath: /tmp-sui-config + readOnly: true + - name: sui-config-volume + mountPath: /root/.sui/sui_config + containers: + - name: {{ .Chart.Name }} + image: "{{ .Values.atomaProxy.image.repository }}:{{ .Values.atomaProxy.image.tag }}" + imagePullPolicy: {{ .Values.atomaProxy.image.pullPolicy }} + ports: + - name: http + containerPort: {{ .Values.atomaProxy.service.http.port }} + - name: credentials + containerPort: {{ .Values.atomaProxy.service.credentials.port }} + - name: p2p + containerPort: {{ .Values.atomaProxy.service.p2p.port }} + env: + - name: RUST_LOG + value: {{ .Values.atomaProxy.config.logLevel | quote }} + - name: OTEL_EXPORTER_OTLP_ENDPOINT + value: "http://{{ .Release.Name }}-otel-collector:4317" + volumeMounts: + - name: config + mountPath: /app/config.toml + subPath: config.toml + - name: open-router + mountPath: /app/open_router.json + subPath: open_router.json + - name: logs + mountPath: /app/logs + - name: data + mountPath: /app/data + - name: sui-config-volume + mountPath: /root/.sui/sui_config + resources: + {{- toYaml .Values.atomaProxy.resources | nindent 12 }} + livenessProbe: + httpGet: + path: /health + port: http + initialDelaySeconds: 30 + periodSeconds: 10 + readinessProbe: + httpGet: + path: /health + port: http + initialDelaySeconds: 5 + periodSeconds: 5 + volumes: + - name: config + configMap: + name: {{ .Release.Name }}-config + - name: open-router + secret: + secretName: {{ .Release.Name }}-open-router + - name: logs + emptyDir: {} + - name: data + emptyDir: {} + - name: tmp-sui-config + configMap: + name: {{ .Release.Name }}-sui-config + defaultMode: 0644 + - name: sui-config-volume + emptyDir: {} \ No newline at end of file diff --git a/helm/atoma-proxy/templates/ingress.yaml b/helm/atoma-proxy/templates/ingress.yaml new file mode 100644 index 00000000..23e1cf8d --- /dev/null +++ b/helm/atoma-proxy/templates/ingress.yaml @@ -0,0 +1,43 @@ +{{- if .Values.atomaProxy.ingress.enabled -}} +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: {{ .Release.Name }}-ingress + namespace: {{ .Release.Namespace }} + labels: + app.kubernetes.io/name: {{ .Release.Name }} + app.kubernetes.io/instance: {{ .Release.Name }} + {{- with .Values.atomaProxy.ingress.annotations }} + annotations: + {{- toYaml . | nindent 4 }} + {{- end }} +spec: + ingressClassName: {{ .Values.atomaProxy.ingress.className }} + tls: + - hosts: + {{- range .Values.atomaProxy.ingress.hosts }} + - {{ .host }} + {{- end }} + secretName: {{ .Release.Name }}-tls + rules: + {{- range .Values.atomaProxy.ingress.hosts }} + - host: {{ .host }} + http: + paths: + {{- range .paths }} + - path: {{ .path }} + pathType: {{ .pathType }} + backend: + service: + name: {{ $.Release.Name }} + port: + {{- if eq .service "http" }} + name: http + {{- else if eq .service "credentials" }} + name: credentials + {{- else if eq .service "prover" }} + number: 8080 + {{- end }} + {{- end }} + {{- end }} +{{- end }} \ No newline at end of file diff --git a/helm/atoma-proxy/templates/otel-collector-configmap.yaml b/helm/atoma-proxy/templates/otel-collector-configmap.yaml new file mode 100644 index 00000000..c2dc234c --- /dev/null +++ b/helm/atoma-proxy/templates/otel-collector-configmap.yaml @@ -0,0 +1,40 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: otel-collector-config + namespace: {{ .Release.Namespace }} +data: + otel-collector-config.yaml: | + receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + processors: + batch: + exporters: + prometheus: + endpoint: "0.0.0.0:8889" + namespace: "atoma" + loki: + endpoint: http://loki:3100/loki/api/v1/push + otlp/tempo: + endpoint: tempo:4317 + tls: + insecure: true + service: + pipelines: + metrics: + receivers: [otlp] + processors: [batch] + exporters: [prometheus] + traces: + receivers: [otlp] + processors: [batch] + exporters: [otlp/tempo] + logs: + receivers: [otlp] + processors: [batch] + exporters: [loki] diff --git a/helm/atoma-proxy/templates/otel-collector-deployment.yaml b/helm/atoma-proxy/templates/otel-collector-deployment.yaml new file mode 100644 index 00000000..c03aa97b --- /dev/null +++ b/helm/atoma-proxy/templates/otel-collector-deployment.yaml @@ -0,0 +1,33 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: otel-collector + namespace: {{ .Release.Namespace }} + labels: + app.kubernetes.io/name: otel-collector +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/name: otel-collector + template: + metadata: + labels: + app.kubernetes.io/name: otel-collector + spec: + containers: + - name: otel-collector + image: otel/opentelemetry-collector-contrib:0.119.0 + args: ["--config=/etc/otel-collector-config.yaml"] + ports: + - containerPort: 4317 + - containerPort: 4318 + - containerPort: 8889 + volumeMounts: + - name: otel-collector-config + mountPath: /etc/otel-collector-config.yaml + subPath: otel-collector-config.yaml + volumes: + - name: otel-collector-config + configMap: + name: otel-collector-config diff --git a/helm/atoma-proxy/templates/otel-collector-service.yaml b/helm/atoma-proxy/templates/otel-collector-service.yaml new file mode 100644 index 00000000..f704c47a --- /dev/null +++ b/helm/atoma-proxy/templates/otel-collector-service.yaml @@ -0,0 +1,20 @@ +apiVersion: v1 +kind: Service +metadata: + name: otel-collector + namespace: {{ .Release.Namespace }} + labels: + app.kubernetes.io/name: otel-collector +spec: + selector: + app.kubernetes.io/name: otel-collector + ports: + - name: prometheus + port: 8889 + targetPort: 8889 + - name: otlp-grpc + port: 4317 + targetPort: 4317 + - name: otlp-http + port: 4318 + targetPort: 4318 diff --git a/helm/atoma-proxy/templates/secrets.yaml b/helm/atoma-proxy/templates/secrets.yaml new file mode 100644 index 00000000..83b16214 --- /dev/null +++ b/helm/atoma-proxy/templates/secrets.yaml @@ -0,0 +1,24 @@ +apiVersion: v1 +kind: Secret +metadata: + name: grafana-admin + namespace: {{ .Release.Namespace }} + labels: + app.kubernetes.io/name: {{ .Release.Name }} + app.kubernetes.io/instance: {{ .Release.Name }} +type: Opaque +data: + admin-user: {{ .Values.grafana.adminUser | default "admin" | b64enc }} + admin-password: {{ .Values.grafana.adminPassword | default "admin" | b64enc }} +--- +apiVersion: v1 +kind: Secret +metadata: + name: {{ .Release.Name }}-open-router + namespace: {{ .Release.Namespace }} + labels: + app.kubernetes.io/name: {{ .Release.Name }} + app.kubernetes.io/instance: {{ .Release.Name }} +type: Opaque +data: + open_router.json: {{ .Values.openRouter.apiKey | default "your-api-key" | toJson | b64enc }} \ No newline at end of file diff --git a/helm/atoma-proxy/templates/service.yaml b/helm/atoma-proxy/templates/service.yaml new file mode 100644 index 00000000..ca04b2d1 --- /dev/null +++ b/helm/atoma-proxy/templates/service.yaml @@ -0,0 +1,23 @@ +apiVersion: v1 +kind: Service +metadata: + name: {{ .Release.Name }} + namespace: {{ .Release.Namespace }} + labels: + app.kubernetes.io/name: {{ .Release.Name }} + app.kubernetes.io/instance: {{ .Release.Name }} +spec: + selector: + app.kubernetes.io/name: {{ .Release.Name }} + app.kubernetes.io/instance: {{ .Release.Name }} + ports: + - name: http + port: {{ .Values.atomaProxy.service.http.port }} + targetPort: {{ .Values.atomaProxy.service.http.port }} + - name: credentials + port: {{ .Values.atomaProxy.service.credentials.port }} + targetPort: {{ .Values.atomaProxy.service.credentials.port }} + - name: p2p + port: {{ .Values.atomaProxy.service.p2p.port }} + targetPort: {{ .Values.atomaProxy.service.p2p.port }} + type: ClusterIP diff --git a/helm/atoma-proxy/templates/sui-config-configmap.yaml b/helm/atoma-proxy/templates/sui-config-configmap.yaml new file mode 100644 index 00000000..8a24c866 --- /dev/null +++ b/helm/atoma-proxy/templates/sui-config-configmap.yaml @@ -0,0 +1,16 @@ +# helm/atoma-proxy/templates/sui-config-configmap.yaml +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ .Release.Name }}-sui-config + namespace: {{ .Release.Namespace }} + labels: + app.kubernetes.io/name: {{ .Release.Name }} + app.kubernetes.io/instance: {{ .Release.Name }} +data: + client.yaml: |- +{{ .Files.Get "files/sui_config/client.yaml" | indent 4 }} + sui.keystore: |- +{{ .Files.Get "files/sui_config/sui.keystore" | indent 4 }} + sui.aliases: |- +{{ .Files.Get "files/sui_config/sui.aliases" | indent 4 }} \ No newline at end of file diff --git a/helm/atoma-proxy/values-dev.yaml b/helm/atoma-proxy/values-dev.yaml new file mode 100644 index 00000000..98d30d0f --- /dev/null +++ b/helm/atoma-proxy/values-dev.yaml @@ -0,0 +1,155 @@ +# Development environment settings +global: + environment: development + domain: atoma.network + +atomaProxy: + image: + repository: ghcr.io/atoma-network/atoma-proxy + tag: latest + pullPolicy: Always + replicas: 1 + resources: + limits: + cpu: 500m + memory: 1Gi + requests: + cpu: 200m + memory: 512Mi + ingress: + enabled: true + className: "nginx" + annotations: + cert-manager.io/cluster-issuer: "letsencrypt-staging" + nginx.ingress.kubernetes.io/ssl-redirect: "true" + hosts: + - host: api-dev.atoma.network + paths: + - path: / + pathType: Prefix + service: http + - host: credentials-dev.atoma.network + paths: + - path: / + pathType: Prefix + service: credentials + - host: prover-dev.atoma.network + paths: + - path: / + pathType: Prefix + service: prover + config: + logLevel: "debug" + environment: "development" + extraEnv: + - name: OTEL_EXPORTER_OTLP_ENDPOINT + value: http://otel-collector:4317 + +# PostgreSQL settings +postgresql: + enabled: true + auth: + database: atoma_proxy_dev + username: atoma_proxy_dev + password: "dev_password" # Change this in production + primary: + persistence: + size: 5Gi + service: + port: 5432 + +# Monitoring stack settings +prometheus: + enabled: true + server: + persistentVolume: + size: 5Gi + ingress: + enabled: true + className: "nginx" + annotations: + cert-manager.io/cluster-issuer: "letsencrypt-staging" + nginx.ingress.kubernetes.io/ssl-redirect: "true" + hosts: + - prometheus-dev.atoma.network + service: + type: LoadBalancer + servicePort: 9090 + targetPort: 9090 + +grafana: + enabled: true + persistence: + size: 5Gi + admin: + existingSecret: "grafana-admin" + userKey: admin-user + passwordKey: admin-password + security: + adminPassword: "admin123" + ingress: + enabled: true + hosts: + - grafana-dev.atoma.network + service: + type: LoadBalancer + port: 3000 + targetPort: 3000 + annotations: + metallb.universe.tf/address-pool: grafana-pool + +loki: # Fixed typo: was "oki" + enabled: false + deploymentMode: SingleBinary + auth_enabled: false + + # Remove the storage.bucketNames section entirely + # Use raw config instead + config: | + auth_enabled: false + server: + http_listen_port: 3100 + common: + path_prefix: /var/loki + replication_factor: 1 + ring: + kvstore: + store: inmemory + schema_config: + configs: + - from: 2020-10-24 + store: boltdb-shipper + object_store: filesystem + schema: v11 + index: + prefix: index_ + period: 24h + storage_config: + boltdb_shipper: + active_index_directory: /var/loki/index + shared_store: filesystem + filesystem: + directory: /var/loki/chunks + + singleBinary: + replicas: 1 + persistence: + enabled: true + size: 10Gi + storageClass: local-path + + write: + replicas: 0 + read: + replicas: 0 + backend: + replicas: 0 + +tempo: + enabled: true + persistence: + size: 5Gi + ingress: + enabled: true + hosts: + - tempo-dev.atoma.network diff --git a/helm/atoma-proxy/values-prod.yaml b/helm/atoma-proxy/values-prod.yaml new file mode 100644 index 00000000..16bef508 --- /dev/null +++ b/helm/atoma-proxy/values-prod.yaml @@ -0,0 +1,95 @@ +# Production environment settings +global: + environment: production + domain: atoma.network + +atomaProxy: + image: + repository: ghcr.io/atoma-network/atoma-proxy + tag: latest + pullPolicy: IfNotPresent + replicas: 3 + resources: + requests: + memory: "1Gi" + cpu: "500m" + limits: + memory: "2Gi" + cpu: "1000m" + ingress: + enabled: true + className: "nginx" + annotations: + cert-manager.io/cluster-issuer: "letsencrypt-prod" + nginx.ingress.kubernetes.io/ssl-redirect: "true" + hosts: + - host: api.atoma.network + paths: + - path: / + pathType: Prefix + service: http + - host: credentials.atoma.network + paths: + - path: / + pathType: Prefix + service: credentials + - host: prover.atoma.network + paths: + - path: / + pathType: Prefix + service: prover + config: + logLevel: "info" + environment: "production" + +# PostgreSQL settings +postgresql: + enabled: true + auth: + database: atoma_proxy + username: atoma_proxy + password: "" # Set this via --set or secrets management + primary: + persistence: + size: 50Gi + service: + port: 5432 + +# Monitoring stack settings +prometheus: + enabled: true + server: + persistentVolume: + size: 50Gi + alertmanager: + persistentVolume: + size: 10Gi + +grafana: + enabled: true + persistence: + size: 20Gi + admin: + password: "" # Set this via --set or secrets management + ingress: + enabled: true + hosts: + - grafana.atoma.network + +loki: + enabled: true + persistence: + size: 50Gi + ingress: + enabled: true + hosts: + - loki.atoma.network + +tempo: + enabled: true + persistence: + size: 50Gi + ingress: + enabled: true + hosts: + - tempo.atoma.network diff --git a/helm/atoma-proxy/values.yaml b/helm/atoma-proxy/values.yaml new file mode 100644 index 00000000..68860cee --- /dev/null +++ b/helm/atoma-proxy/values.yaml @@ -0,0 +1,141 @@ +# Global settings +global: + environment: production + domain: atoma.network + +# Main application settings +atomaProxy: + image: + repository: ghcr.io/atoma-network/atoma-proxy + tag: latest + pullPolicy: IfNotPresent + replicas: 1 + resources: + requests: + memory: "512Mi" + cpu: "250m" + limits: + memory: "1Gi" + cpu: "500m" + service: + http: + port: 8080 + credentials: + port: 8081 + p2p: + port: 8083 + ingress: + enabled: true + className: "nginx" + annotations: + cert-manager.io/cluster-issuer: "letsencrypt-prod" + nginx.ingress.kubernetes.io/ssl-redirect: "true" + hosts: + - host: api.atoma.network + paths: + - path: / + pathType: Prefix + service: http + - host: credentials.atoma.network + paths: + - path: / + pathType: Prefix + service: credentials + - host: prover.atoma.network + paths: + - path: / + pathType: Prefix + service: prover + config: + logLevel: "info" + environment: "production" + +# PostgreSQL settings +postgresql: + enabled: true + auth: + database: atoma_proxy + username: atoma_proxy + password: "" + primary: + persistence: + size: 10Gi + service: + port: 5432 + +# Monitoring stack settings +prometheus: + enabled: true + server: + persistentVolume: + size: 10Gi + alertmanager: + persistentVolume: + size: 2Gi + +grafana: + adminUser: admin + adminPassword: admin + enabled: true + persistence: + size: 10Gi + ingress: + enabled: true + hosts: + - grafana.atoma.network + +openRouter: + apiKey: your-api-key + +loki: + enabled: true + persistence: + size: 10Gi + ingress: + enabled: true + hosts: + - loki.atoma.network + +tempo: + enabled: true + persistence: + size: 10Gi + ingress: + enabled: true + hosts: + - tempo.atoma.network + +# OpenTelemetry Collector settings +otelCollector: + enabled: true + config: + receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + processors: + batch: {} + exporters: + prometheus: + endpoint: prometheus-server:9090 + loki: + endpoint: loki:3100 + otlp: + endpoint: tempo:4317 + service: + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [otlp] + metrics: + receivers: [otlp] + processors: [batch] + exporters: [prometheus] + logs: + receivers: [otlp] + processors: [batch] + exporters: [loki] diff --git a/helm/infrastructure/metallb-config.yaml b/helm/infrastructure/metallb-config.yaml new file mode 100644 index 00000000..0f93290f --- /dev/null +++ b/helm/infrastructure/metallb-config.yaml @@ -0,0 +1,32 @@ +apiVersion: metallb.io/v1beta1 +kind: IPAddressPool +metadata: + name: traefik-pool + namespace: metallb-system +spec: + addresses: + - 10.0.235.50/32 +--- +apiVersion: metallb.io/v1beta1 +kind: IPAddressPool +metadata: + name: prometheus-pool + namespace: metallb-system +spec: + addresses: + - 10.0.235.51/32 +--- +apiVersion: metallb.io/v1beta1 +kind: L2Advertisement +metadata: + name: l2 + namespace: metallb-system +--- +apiVersion: metallb.io/v1beta1 +kind: IPAddressPool +metadata: + name: grafana-pool + namespace: metallb-system +spec: + addresses: + - 10.0.235.52/32 diff --git a/helm/infrastructure/metallb-install.yaml b/helm/infrastructure/metallb-install.yaml new file mode 100644 index 00000000..8235a01f --- /dev/null +++ b/helm/infrastructure/metallb-install.yaml @@ -0,0 +1,68 @@ +# metallb-install.yaml +# Installs MetalLB in native mode (recommended for k3s and modern clusters) +apiVersion: v1 +kind: Namespace +metadata: + name: metallb-system +--- +# Install MetalLB components (controller and speaker) +# You can always get the latest from: https://metallb.universe.tf/installation/ +# This is for v0.14.5 (as of June 2024) +apiVersion: apps/v1 +kind: Deployment +metadata: + name: controller + namespace: metallb-system +spec: + selector: + matchLabels: + app: metallb + component: controller + replicas: 1 + template: + metadata: + labels: + app: metallb + component: controller + spec: + containers: + - name: controller + image: quay.io/metallb/controller:v0.14.5 + args: + - controller + resources: + requests: + cpu: 100m + memory: 100Mi + limits: + cpu: 200m + memory: 200Mi +--- +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: speaker + namespace: metallb-system +spec: + selector: + matchLabels: + app: metallb + component: speaker + template: + metadata: + labels: + app: metallb + component: speaker + spec: + containers: + - name: speaker + image: quay.io/metallb/speaker:v0.14.5 + args: + - speaker + resources: + requests: + cpu: 100m + memory: 100Mi + limits: + cpu: 200m + memory: 200Mi