diff --git a/src/daemon/context.rs b/src/daemon/context.rs index 3829156135e..f6a60538967 100644 --- a/src/daemon/context.rs +++ b/src/daemon/context.rs @@ -334,12 +334,13 @@ fn handle_admin_token( ki.private_key(), token_exp, )?; - info!("Admin token: {token}"); let default_token_path = config.client.default_rpc_token_path(); if let Err(e) = crate::utils::io::write_new_sensitive_file(token.as_bytes(), &default_token_path) { tracing::warn!("Failed to save the default admin token file: {e}"); + } else { + info!("Admin token is saved to {}", default_token_path.display()); } if let Some(path) = opts.save_token.as_ref() { if let Some(dir) = path.parent() @@ -354,6 +355,7 @@ fn handle_admin_token( } std::fs::write(path, &token) .with_context(|| format!("Failed to save admin token to {}", path.display()))?; + info!("Admin token is saved to {}", path.display()); } Ok(token) diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 5b65cce635c..c753a239aed 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -80,7 +80,6 @@ pub async fn start_interruptable(opts: CliOpts, config: Config) -> anyhow::Resul let start_time = chrono::Utc::now(); let mut terminate = signal(SignalKind::terminate())?; let (shutdown_send, mut shutdown_recv) = mpsc::channel(1); - let result = tokio::select! { ret = start(start_time, opts, config, shutdown_send) => ret, _ = ctrl_c() => { @@ -375,6 +374,7 @@ fn maybe_start_rpc_service( chain_follower: &ChainFollower, start_time: chrono::DateTime, shutdown: mpsc::Sender<()>, + rpc_stop_handle: jsonrpsee::server::StopHandle, ctx: &AppContext, ) -> anyhow::Result<()> { if config.client.enable_rpc { @@ -402,6 +402,12 @@ fn maybe_start_rpc_service( let snapshot_progress_tracker = ctx.snapshot_progress_tracker.clone(); let msgs_in_tipset = Arc::new(crate::chain::MsgsInTipsetCache::default()); async move { + let rpc_listener = tokio::net::TcpListener::bind(rpc_address) + .await + .map_err(|e| { + anyhow::anyhow!("Unable to listen on RPC endpoint {rpc_address}: {e}") + }) + .unwrap(); start_rpc( RPCState { state_manager, @@ -417,7 +423,8 @@ fn maybe_start_rpc_service( tipset_send, snapshot_progress_tracker, }, - rpc_address, + rpc_listener, + rpc_stop_handle, filter_list, ) .await @@ -559,11 +566,16 @@ pub(super) async fn start( }); } loop { + let (rpc_stop_handle, rpc_server_handle) = jsonrpsee::server::stop_channel(); tokio::select! { _ = snap_gc_reboot_rx.recv_async() => { + // gracefully shutdown RPC server + if let Err(e) = rpc_server_handle.stop() { + tracing::warn!("failed to stop RPC server: {e}"); + } snap_gc.cleanup_before_reboot().await; } - result = start_services(start_time, &opts, config.clone(), shutdown_send.clone(), |ctx, sync_status| { + result = start_services(start_time, &opts, config.clone(), shutdown_send.clone(), rpc_stop_handle, |ctx, sync_status| { snap_gc.set_db(ctx.db.clone()); snap_gc.set_sync_status(sync_status); snap_gc.set_car_db_head_epoch(ctx.db.heaviest_tipset().map(|ts|ts.epoch()).unwrap_or_default()); @@ -579,6 +591,7 @@ pub(super) async fn start_services( opts: &CliOpts, mut config: Config, shutdown_send: mpsc::Sender<()>, + rpc_stop_handle: jsonrpsee::server::StopHandle, on_app_context_and_db_initialized: impl FnOnce(&AppContext, SyncStatus), ) -> anyhow::Result<()> { // Cleanup the collector prometheus metrics registry on start @@ -608,6 +621,7 @@ pub(super) async fn start_services( &chain_follower, start_time, shutdown_send.clone(), + rpc_stop_handle, &ctx, )?; diff --git a/src/rpc/methods/auth.rs b/src/rpc/methods/auth.rs index 6ffb56181d4..faec5406324 100644 --- a/src/rpc/methods/auth.rs +++ b/src/rpc/methods/auth.rs @@ -1,9 +1,12 @@ // Copyright 2019-2026 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use crate::auth::*; -use crate::lotus_json::lotus_json_with_self; -use crate::rpc::{ApiPaths, Ctx, Permission, RpcMethod, ServerError}; +use crate::{ + KeyStore, + auth::*, + lotus_json::lotus_json_with_self, + rpc::{ApiPaths, Ctx, Permission, RpcMethod, ServerError}, +}; use anyhow::Result; use chrono::Duration; use enumflags2::BitFlags; @@ -14,6 +17,18 @@ use serde_with::{DurationSeconds, serde_as}; /// RPC call to create a new JWT Token pub enum AuthNew {} + +impl AuthNew { + pub fn create_token( + keystore: &KeyStore, + token_exp: Duration, + permissions: Vec, + ) -> anyhow::Result { + let ki = keystore.get(JWT_IDENTIFIER)?; + Ok(create_token(permissions, ki.private_key(), token_exp)?) + } +} + impl RpcMethod<2> for AuthNew { const NAME: &'static str = "Filecoin.AuthNew"; const N_REQUIRED_PARAMS: usize = 1; @@ -28,14 +43,13 @@ impl RpcMethod<2> for AuthNew { (permissions, expiration_secs): Self::Params, ) -> Result { let ks = ctx.keystore.read(); - let ki = ks.get(JWT_IDENTIFIER)?; // Lotus admin tokens do not expire but Forest requires all JWT tokens to // have an expiration date. So we set the expiration date to 100 years in // the future to match user-visible behavior of Lotus. let token_exp = expiration_secs .map(chrono::Duration::seconds) .unwrap_or_else(|| chrono::Duration::days(365 * 100)); - let token = create_token(permissions, ki.private_key(), token_exp)?; + let token = Self::create_token(&ks, token_exp, permissions)?; Ok(token.as_bytes().to_vec()) } } diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index e7d55df8e16..130ca7faf42 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -443,11 +443,10 @@ use fvm_ipld_blockstore::Blockstore; use jsonrpsee::{ Methods, core::middleware::RpcServiceBuilder, - server::{RpcModule, Server, StopHandle, TowerServiceBuilder, stop_channel}, + server::{RpcModule, Server, StopHandle, TowerServiceBuilder}, }; use parking_lot::RwLock; use std::env; -use std::net::SocketAddr; use std::sync::{Arc, LazyLock}; use std::time::Duration; use tokio::sync::mpsc; @@ -529,7 +528,8 @@ struct PerConnection { pub async fn start_rpc( state: RPCState, - rpc_endpoint: SocketAddr, + rpc_listener: tokio::net::TcpListener, + stop_handle: StopHandle, filter_list: Option, ) -> anyhow::Result<()> where @@ -556,8 +556,6 @@ where let methods: Arc> = Arc::new(modules.into_iter().map(|(k, v)| (k, v.into())).collect()); - let (stop_handle, _server_handle) = stop_channel(); - let per_conn = PerConnection { stop_handle: stop_handle.clone(), svc_builder: Server::builder() @@ -580,12 +578,10 @@ where .to_service_builder(), keystore, }; - - let listener = tokio::net::TcpListener::bind(rpc_endpoint).await.unwrap(); tracing::info!("Ready for RPC connections"); loop { let sock = tokio::select! { - res = listener.accept() => { + res = rpc_listener.accept() => { match res { Ok((stream, _remote_addr)) => stream, Err(e) => { @@ -788,7 +784,14 @@ pub fn openrpc(path: ApiPaths, include: Option<&[&str]>) -> openrpc_types::OpenR #[cfg(test)] mod tests { - use crate::rpc::ApiPaths; + use super::*; + use crate::{ + db::MemoryDB, networks::NetworkChain, rpc::common::ShiftingVersion, + tool::offline_server::server::offline_rpc_state, + }; + use jsonrpsee::server::stop_channel; + use std::net::{Ipv4Addr, SocketAddr}; + use tokio::task::JoinSet; // `cargo test --lib -- --exact 'rpc::tests::openrpc'` // `cargo insta review` @@ -807,4 +810,62 @@ mod tests { insta::assert_yaml_snapshot!(_spec); } } + + #[tokio::test(flavor = "multi_thread")] + async fn test_rpc_server() { + let chain = NetworkChain::Calibnet; + let db = Arc::new(MemoryDB::default()); + let mut services = JoinSet::new(); + let (state, mut shutdown_recv) = offline_rpc_state(chain, db, None, None, &mut services) + .await + .unwrap(); + let block_delay_secs = state.chain_config().block_delay_secs; + let shutdown_send = state.shutdown.clone(); + let jwt_read_permissions = vec!["read".to_owned()]; + let jwt_read = super::methods::auth::AuthNew::create_token( + &state.keystore.read(), + chrono::Duration::hours(1), + jwt_read_permissions.clone(), + ) + .unwrap(); + let rpc_listener = + tokio::net::TcpListener::bind(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0)) + .await + .unwrap(); + let rpc_address = rpc_listener.local_addr().unwrap(); + let (stop_handle, server_handle) = stop_channel(); + + // Start an RPC server + + let handle = tokio::spawn(start_rpc(state, rpc_listener, stop_handle, None)); + + // Send a few requests + + let client = Client::from_url( + format!("http://{}:{}/", rpc_address.ip(), rpc_address.port()) + .parse() + .unwrap(), + ); + + let response = super::methods::common::Version::call(&client, ()) + .await + .unwrap(); + assert_eq!( + &response.version, + &*crate::utils::version::FOREST_VERSION_STRING + ); + assert_eq!(response.block_delay, block_delay_secs); + assert_eq!(response.api_version, ShiftingVersion::new(2, 3, 0)); + + let response = super::methods::auth::AuthVerify::call(&client, (jwt_read,)) + .await + .unwrap(); + assert_eq!(response, jwt_read_permissions); + + // Gracefully shutdown the RPC server + shutdown_send.send(()).await.unwrap(); + shutdown_recv.recv().await; + server_handle.stop().unwrap(); + handle.await.unwrap().unwrap(); + } } diff --git a/src/tool/mod.rs b/src/tool/mod.rs index 2112243ebe0..191cd338e55 100644 --- a/src/tool/mod.rs +++ b/src/tool/mod.rs @@ -1,5 +1,5 @@ // Copyright 2019-2026 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT pub mod main; -mod offline_server; +pub mod offline_server; pub mod subcommands; diff --git a/src/tool/offline_server/server.rs b/src/tool/offline_server/server.rs index 47f5ae30fb1..849e70c19a2 100644 --- a/src/tool/offline_server/server.rs +++ b/src/tool/offline_server/server.rs @@ -9,7 +9,9 @@ use crate::cli_shared::cli::EventsConfig; use crate::cli_shared::snapshot::TrustedVendor; use crate::daemon::db_util::RangeSpec; use crate::daemon::db_util::backfill_db; -use crate::db::{MemoryDB, car::ManyCar}; +use crate::db::{ + EthMappingsStore, HeaviestTipsetKeyProvider, MemoryDB, SettingsStore, car::ManyCar, +}; use crate::genesis::read_genesis_header; use crate::key_management::{KeyStore, KeyStoreConfig}; use crate::libp2p::PeerManager; @@ -24,11 +26,12 @@ use crate::utils::proofs_api::{self, ensure_proof_params_downloaded}; use crate::{Config, JWT_IDENTIFIER}; use anyhow::Context as _; use fvm_ipld_blockstore::Blockstore; +use jsonrpsee::server::stop_channel; use parking_lot::RwLock; -use std::mem::discriminant; use std::{ + mem::discriminant, net::{IpAddr, Ipv4Addr, SocketAddr}, - path::PathBuf, + path::{Path, PathBuf}, sync::Arc, }; use tokio::{ @@ -41,6 +44,96 @@ use tokio::{ }; use tracing::{info, warn}; +/// Builds offline RPC state and returns it with a shutdown receiver. +/// The receiver is notified when RPC shutdown is requested. +pub async fn offline_rpc_state( + chain: NetworkChain, + db: Arc, + genesis_fp: Option<&Path>, + save_jwt_token: Option<&Path>, + services: &mut JoinSet>, +) -> anyhow::Result<(RPCState, mpsc::Receiver<()>)> +where + DB: Blockstore + + SettingsStore + + HeaviestTipsetKeyProvider + + EthMappingsStore + + Send + + Sync + + 'static, +{ + let chain_config = Arc::new(handle_chain_config(&chain)?); + let events_config = Arc::new(EventsConfig::default()); + let genesis_header = read_genesis_header( + genesis_fp, + chain_config.genesis_bytes(&db).await?.as_deref(), + &db, + ) + .await?; + // let head_ts = db.heaviest_tipset()?; + let chain_store = Arc::new(ChainStore::new( + db.clone(), + db.clone(), + db.clone(), + chain_config, + genesis_header.clone(), + )?); + let state_manager = Arc::new(StateManager::new(chain_store.clone())?); + let (network_send, _) = flume::bounded(5); + let (tipset_send, _) = flume::bounded(5); + + let message_pool = MessagePool::new( + MpoolRpcProvider::new(chain_store.publisher().clone(), state_manager.clone()), + network_send.clone(), + Default::default(), + state_manager.chain_config().clone(), + services, + )?; + + let (shutdown, shutdown_recv) = mpsc::channel(1); + + let mut keystore = KeyStore::new(KeyStoreConfig::Memory)?; + keystore.put(JWT_IDENTIFIER, generate_priv_key())?; + let ki = keystore.get(JWT_IDENTIFIER)?; + // Lotus admin tokens do not expire but Forest requires all JWT tokens to + // have an expiration date. So we set the expiration date to 100 years in + // the future to match user-visible behavior of Lotus. + let token_exp = chrono::Duration::days(365 * 100); + let token = crate::auth::create_token( + crate::auth::ADMIN.iter().map(ToString::to_string).collect(), + ki.private_key(), + token_exp, + )?; + if let Some(path) = save_jwt_token { + crate::utils::io::write_new_sensitive_file(token.as_bytes(), path)?; + info!("Admin token is saved to {}", path.display()); + } else { + info!("Admin token generated. Use --save-token to persist it."); + } + + let peer_manager = Arc::new(PeerManager::default()); + let sync_network_context = + SyncNetworkContext::new(network_send, peer_manager, state_manager.blockstore_owned()); + + Ok(( + RPCState { + state_manager, + keystore: Arc::new(RwLock::new(keystore)), + mpool: Arc::new(message_pool), + bad_blocks: Default::default(), + msgs_in_tipset: Default::default(), + sync_status: Arc::new(RwLock::new(SyncStatusReport::init())), + eth_event_handler: Arc::new(EthEventHandler::from_config(&events_config)), + sync_network_context, + start_time: chrono::Utc::now(), + shutdown, + tipset_send, + snapshot_progress_tracker: Default::default(), + }, + shutdown_recv, + )) +} + #[allow(clippy::too_many_arguments)] pub async fn start_offline_server( snapshot_files: Vec, @@ -54,17 +147,23 @@ pub async fn start_offline_server( ) -> anyhow::Result<()> { info!("Configuring Offline RPC Server"); - let db = Arc::new(ManyCar::new(MemoryDB::default())); - - let snapshot_files = handle_snapshots( - snapshot_files, - chain.as_ref(), - auto_download_snapshot, - genesis.clone(), - ) - .await?; + // Set proof parameter data dir and make sure the proofs are available. Otherwise, + // validation might fail due to missing proof parameters. + proofs_api::maybe_set_proofs_parameter_cache_dir_env(&Config::default().client.data_dir); + ensure_proof_params_downloaded().await?; - db.read_only_files(snapshot_files.iter().cloned())?; + let db = { + let db = Arc::new(ManyCar::new(MemoryDB::default())); + let snapshot_files = handle_snapshots( + snapshot_files, + chain.as_ref(), + auto_download_snapshot, + genesis.clone(), + ) + .await?; + db.read_only_files(snapshot_files.iter().cloned())?; + db + }; let inferred_chain = { let head = db.heaviest_tipset()?; @@ -80,49 +179,27 @@ pub async fn start_offline_server( } else { inferred_chain }; - - let chain_config = Arc::new(handle_chain_config(&chain)?); - let events_config = Arc::new(EventsConfig::default()); - let genesis_header = read_genesis_header( + let mut services = JoinSet::new(); + let (rpc_state, shutdown_recv) = offline_rpc_state( + chain, + db, genesis.as_deref(), - chain_config.genesis_bytes(&db).await?.as_deref(), - &db, + save_jwt_token.as_deref(), + &mut services, ) .await?; - let head_ts = db.heaviest_tipset()?; - let chain_store = Arc::new(ChainStore::new( - db.clone(), - db.clone(), - db.clone(), - chain_config, - genesis_header.clone(), - )?); - let state_manager = Arc::new(StateManager::new(chain_store.clone())?); - - // Set proof parameter data dir and make sure the proofs are available. Otherwise, - // validation might fail due to missing proof parameters. - proofs_api::maybe_set_proofs_parameter_cache_dir_env(&Config::default().client.data_dir); - ensure_proof_params_downloaded().await?; + let state_manager = &rpc_state.state_manager; + let head_ts = state_manager.heaviest_tipset(); if index_backfill_epochs > 0 { backfill_db( - &state_manager, + state_manager, &head_ts, RangeSpec::NumTipsets(index_backfill_epochs), ) .await?; } - let (network_send, _) = flume::bounded(5); - let (tipset_send, _) = flume::bounded(5); - let message_pool: MessagePool> = MessagePool::new( - MpoolRpcProvider::new(chain_store.publisher().clone(), state_manager.clone()), - network_send.clone(), - Default::default(), - state_manager.chain_config().clone(), - &mut JoinSet::new(), - )?; - // Validate tipsets since the {height} EPOCH when `height >= 0`, // or valiadte the last {-height} EPOCH(s) when `height < 0` let validate_until_epoch = if height > 0 { @@ -133,48 +210,11 @@ pub async fn start_offline_server( if validate_until_epoch <= head_ts.epoch() { state_manager.validate_tipsets( head_ts - .chain(&db) + .chain(rpc_state.store()) .take_while(|ts| ts.epoch() >= validate_until_epoch), )?; } - let (shutdown, shutdown_recv) = mpsc::channel(1); - - let mut keystore = KeyStore::new(KeyStoreConfig::Memory)?; - keystore.put(JWT_IDENTIFIER, generate_priv_key())?; - let ki = keystore.get(JWT_IDENTIFIER)?; - // Lotus admin tokens do not expire but Forest requires all JWT tokens to - // have an expiration date. So we set the expiration date to 100 years in - // the future to match user-visible behavior of Lotus. - let token_exp = chrono::Duration::days(365 * 100); - let token = crate::auth::create_token( - crate::auth::ADMIN.iter().map(ToString::to_string).collect(), - ki.private_key(), - token_exp, - )?; - info!("Admin token: {token}"); - if let Some(path) = save_jwt_token { - std::fs::write(path, token)?; - } - - let peer_manager = Arc::new(PeerManager::default()); - let sync_network_context = - SyncNetworkContext::new(network_send, peer_manager, state_manager.blockstore_owned()); - - let rpc_state = RPCState { - state_manager, - keystore: Arc::new(RwLock::new(keystore)), - mpool: Arc::new(message_pool), - bad_blocks: Default::default(), - msgs_in_tipset: Default::default(), - sync_status: Arc::new(RwLock::new(SyncStatusReport::init())), - eth_event_handler: Arc::new(EthEventHandler::from_config(&events_config)), - sync_network_context, - start_time: chrono::Utc::now(), - shutdown, - tipset_send, - snapshot_progress_tracker: Default::default(), - }; start_offline_rpc(rpc_state, rpc_port, shutdown_recv).await?; Ok(()) @@ -190,10 +230,11 @@ where { info!("Starting offline RPC Server"); let rpc_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), rpc_port); + let rpc_listener = tokio::net::TcpListener::bind(rpc_address).await?; let mut terminate = signal(SignalKind::terminate())?; - + let (stop_handle, server_handle) = stop_channel(); let result = tokio::select! { - ret = start_rpc(state, rpc_address, None) => ret, + ret = start_rpc(state, rpc_listener,stop_handle, None) => ret, _ = ctrl_c() => { info!("Keyboard interrupt."); Ok(()) @@ -207,6 +248,9 @@ where Ok(()) }, }; + if let Err(e) = server_handle.stop() { + tracing::warn!("{e}"); + } crate::utils::io::terminal_cleanup(); result }