|
| 1 | +// Copyright (c), Mysten Labs, Inc. |
| 2 | +// SPDX-License-Identifier: Apache-2.0 |
| 3 | + |
| 4 | +//! Aggregator server for Seal committee mode. It fetches encrypted partial keys from committee |
| 5 | +//! servers, verifies and aggregates them into a single response. |
| 6 | +
|
| 7 | +#![allow(dead_code)] |
| 8 | +#![allow(unused_variables)] |
| 9 | + |
| 10 | +use anyhow::{Context, Result}; |
| 11 | +use axum::{ |
| 12 | + extract::State, |
| 13 | + http::{HeaderMap, StatusCode}, |
| 14 | + response::{IntoResponse, Response}, |
| 15 | + routing::post, |
| 16 | + Json, Router, |
| 17 | +}; |
| 18 | +use mysten_service::{get_mysten_service, package_name, package_version}; |
| 19 | +use seal_committee::{grpc_helper::create_grpc_client, move_types::PartialKeyServer, Network}; |
| 20 | +use seal_sdk::{FetchKeyRequest, FetchKeyResponse}; |
| 21 | +use serde::Deserialize; |
| 22 | +use std::env; |
| 23 | +use std::sync::Arc; |
| 24 | +use sui_rpc::client::Client as SuiGrpcClient; |
| 25 | +use sui_sdk_types::Address; |
| 26 | +use tower_http::cors::{Any, CorsLayer}; |
| 27 | +use tracing::info; |
| 28 | + |
| 29 | +/// Minimum required version for committee members' responses (matches typescript). |
| 30 | +const MIN_SERVER_VERSION: &str = ">=0.4.1"; |
| 31 | + |
| 32 | +/// Default port for aggregator server. |
| 33 | +const DEFAULT_PORT: u16 = 2027; |
| 34 | + |
| 35 | +/// Configuration for aggregator server. |
| 36 | +#[derive(Deserialize)] |
| 37 | +struct Config { |
| 38 | + network: Network, |
| 39 | + key_server_object_id: Address, |
| 40 | +} |
| 41 | + |
| 42 | +/// Application state. |
| 43 | +#[derive(Clone)] |
| 44 | +struct AppState { |
| 45 | + key_server_object_id: Address, |
| 46 | + network: Network, |
| 47 | + grpc_client: SuiGrpcClient, |
| 48 | + threshold: u16, |
| 49 | + committee_members: Arc<Vec<PartialKeyServer>>, |
| 50 | +} |
| 51 | + |
| 52 | +/// Custom error type for aggregator responses. |
| 53 | +struct AggregatorError { |
| 54 | + status: StatusCode, |
| 55 | + message: String, |
| 56 | + headers: HeaderMap, |
| 57 | +} |
| 58 | + |
| 59 | +impl IntoResponse for AggregatorError { |
| 60 | + fn into_response(self) -> Response { |
| 61 | + let mut response = (self.status, self.message).into_response(); |
| 62 | + *response.headers_mut() = self.headers; |
| 63 | + response |
| 64 | + } |
| 65 | +} |
| 66 | + |
| 67 | +#[tokio::main] |
| 68 | +async fn main() -> Result<()> { |
| 69 | + tracing_subscriber::fmt::init(); |
| 70 | + |
| 71 | + // Load configuration from file. |
| 72 | + let config_path = |
| 73 | + env::var("CONFIG_PATH").context("CONFIG_PATH environment variable not set")?; |
| 74 | + info!("Loading config file: {}", config_path); |
| 75 | + |
| 76 | + let config: Config = serde_yaml::from_reader( |
| 77 | + std::fs::File::open(&config_path) |
| 78 | + .context(format!("Cannot open configuration file {config_path}"))?, |
| 79 | + ) |
| 80 | + .context("Failed to parse configuration file")?; |
| 81 | + let grpc_client = create_grpc_client(&config.network)?; |
| 82 | + |
| 83 | + info!( |
| 84 | + "Starting aggregator for KeyServer {} on {:?}", |
| 85 | + config.key_server_object_id, config.network |
| 86 | + ); |
| 87 | + |
| 88 | + let state = AppState { |
| 89 | + key_server_object_id: config.key_server_object_id, |
| 90 | + network: config.network, |
| 91 | + grpc_client, |
| 92 | + threshold: 0, // TODO: Load from onchain |
| 93 | + committee_members: Arc::new(vec![]), // TODO: Load from onchain |
| 94 | + }; |
| 95 | + |
| 96 | + // TODO: Spawn background task to watch onchain for committee version updates: |
| 97 | + // 1. Every 30s, fetch KeyServerV2.version from onchain |
| 98 | + // 2. If version changes, refresh committee_members in AppState |
| 99 | + info!( |
| 100 | + "Loaded committee with {} members, threshold {}", |
| 101 | + state.committee_members.len(), |
| 102 | + state.threshold |
| 103 | + ); |
| 104 | + |
| 105 | + let port: u16 = env::var("PORT") |
| 106 | + .unwrap_or_else(|_| DEFAULT_PORT.to_string()) |
| 107 | + .parse() |
| 108 | + .context("Invalid PORT")?; |
| 109 | + |
| 110 | + let app = get_mysten_service::<AppState>(package_name!(), package_version!()) |
| 111 | + .merge(Router::new().route("/v1/fetch_key", post(handle_fetch_key))) |
| 112 | + .with_state(state) |
| 113 | + .layer(CorsLayer::new().allow_origin(Any)); |
| 114 | + |
| 115 | + let addr = std::net::SocketAddr::from(([0, 0, 0, 0], port)); |
| 116 | + let listener = tokio::net::TcpListener::bind(addr).await?; |
| 117 | + info!("Aggregator server listening on http://localhost:{}", port); |
| 118 | + |
| 119 | + axum::serve(listener, app).await?; |
| 120 | + Ok(()) |
| 121 | +} |
| 122 | + |
| 123 | +/// Handle fetch_key request by fanning out to committee members and aggregating responses. |
| 124 | +async fn handle_fetch_key( |
| 125 | + State(state): State<AppState>, |
| 126 | + Json(request): Json<FetchKeyRequest>, |
| 127 | +) -> Result<(HeaderMap, Json<FetchKeyResponse>), AggregatorError> { |
| 128 | + // TODO: |
| 129 | + // 1. Call fetch_from_member for all committee members in parallel. |
| 130 | + // 2. Collect responses until we have t successful responses and abort others. |
| 131 | + |
| 132 | + // 3. Track versions from each response header (X-KeyServer-Version). |
| 133 | + // Use the oldest version as the aggregator's response version (?) |
| 134 | + |
| 135 | + // 4. Upon sufficient responses, Aaggregate encrypted responses using crypto::elgamal::aggregate_encrypted |
| 136 | + |
| 137 | + // 5. Return with appropriate headers |
| 138 | + |
| 139 | + unimplemented!("depends on crypto code") |
| 140 | +} |
| 141 | + |
| 142 | +/// Fetch encrypted partial key from a single committee member. |
| 143 | +async fn fetch_from_member( |
| 144 | + member: &PartialKeyServer, |
| 145 | + request: &FetchKeyRequest, |
| 146 | +) -> Result<(FetchKeyResponse, String), String> { |
| 147 | + // TODO: |
| 148 | + // 1. Implement HTTP client call to each member URL. |
| 149 | + // 2. Extract each and validate X-KeyServer-Version header with MIN_SERVER_VERSION |
| 150 | + // 3. Parse response body as FetchKeyResponse |
| 151 | + // 4. Verify encrypted signatures using crypto::ibe::verify_encrypted_signature. |
| 152 | + // 5. Return (response, version_string) |
| 153 | + |
| 154 | + unimplemented!("not implemented yet") |
| 155 | +} |
| 156 | + |
| 157 | +/// Load committee state from onchain KeyServerV2 object. |
| 158 | +async fn load_committee_state(key_server_obj_id: &Address, network: Network) -> Result<AppState> { |
| 159 | + // TODO: |
| 160 | + // 1. Use grpc client to fetch KeyServerV2 object from chain. |
| 161 | + // 2. Parse committee members and threshold. |
| 162 | + // 3. Return AppState with loaded data. |
| 163 | + |
| 164 | + unimplemented!("not implemented yet") |
| 165 | +} |
0 commit comments