From bee321c3437726038138544e07e1a2754e31c5d3 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Wed, 28 Jan 2026 23:03:31 +0800 Subject: [PATCH 1/6] test: unit test for `start_rpc` --- src/daemon/mod.rs | 20 ++- src/rpc/methods/auth.rs | 24 +++- src/rpc/mod.rs | 75 +++++++++-- src/tool/mod.rs | 2 +- src/tool/offline_server/server.rs | 204 +++++++++++++++++------------- 5 files changed, 220 insertions(+), 105 deletions(-) diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 5b65cce635c..e92e5615d32 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -80,7 +80,6 @@ pub async fn start_interruptable(opts: CliOpts, config: Config) -> anyhow::Resul let start_time = chrono::Utc::now(); let mut terminate = signal(SignalKind::terminate())?; let (shutdown_send, mut shutdown_recv) = mpsc::channel(1); - let result = tokio::select! { ret = start(start_time, opts, config, shutdown_send) => ret, _ = ctrl_c() => { @@ -375,6 +374,7 @@ fn maybe_start_rpc_service( chain_follower: &ChainFollower, start_time: chrono::DateTime, shutdown: mpsc::Sender<()>, + rpc_stop_handle: jsonrpsee::server::StopHandle, ctx: &AppContext, ) -> anyhow::Result<()> { if config.client.enable_rpc { @@ -402,6 +402,12 @@ fn maybe_start_rpc_service( let snapshot_progress_tracker = ctx.snapshot_progress_tracker.clone(); let msgs_in_tipset = Arc::new(crate::chain::MsgsInTipsetCache::default()); async move { + let rpc_listener = tokio::net::TcpListener::bind(rpc_address) + .await + .map_err(|e| { + anyhow::anyhow!("Unable to listen on RPC endpoint {rpc_address}: {e}") + }) + .unwrap(); start_rpc( RPCState { state_manager, @@ -417,7 +423,8 @@ fn maybe_start_rpc_service( tipset_send, snapshot_progress_tracker, }, - rpc_address, + rpc_listener, + rpc_stop_handle, filter_list, ) .await @@ -559,11 +566,16 @@ pub(super) async fn start( }); } loop { + let (rpc_stop_handle, rpc_server_handle) = jsonrpsee::server::stop_channel(); tokio::select! { _ = snap_gc_reboot_rx.recv_async() => { snap_gc.cleanup_before_reboot().await; + // gracefully shutdown RPC server + if let Err(e) = rpc_server_handle.stop() { + tracing::warn!("failed to stop RPC server: {e}"); + } } - result = start_services(start_time, &opts, config.clone(), shutdown_send.clone(), |ctx, sync_status| { + result = start_services(start_time, &opts, config.clone(), shutdown_send.clone(), rpc_stop_handle, |ctx, sync_status| { snap_gc.set_db(ctx.db.clone()); snap_gc.set_sync_status(sync_status); snap_gc.set_car_db_head_epoch(ctx.db.heaviest_tipset().map(|ts|ts.epoch()).unwrap_or_default()); @@ -579,6 +591,7 @@ pub(super) async fn start_services( opts: &CliOpts, mut config: Config, shutdown_send: mpsc::Sender<()>, + rpc_stop_handle: jsonrpsee::server::StopHandle, on_app_context_and_db_initialized: impl FnOnce(&AppContext, SyncStatus), ) -> anyhow::Result<()> { // Cleanup the collector prometheus metrics registry on start @@ -608,6 +621,7 @@ pub(super) async fn start_services( &chain_follower, start_time, shutdown_send.clone(), + rpc_stop_handle, &ctx, )?; diff --git a/src/rpc/methods/auth.rs b/src/rpc/methods/auth.rs index 6ffb56181d4..faec5406324 100644 --- a/src/rpc/methods/auth.rs +++ b/src/rpc/methods/auth.rs @@ -1,9 +1,12 @@ // Copyright 2019-2026 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use crate::auth::*; -use crate::lotus_json::lotus_json_with_self; -use crate::rpc::{ApiPaths, Ctx, Permission, RpcMethod, ServerError}; +use crate::{ + KeyStore, + auth::*, + lotus_json::lotus_json_with_self, + rpc::{ApiPaths, Ctx, Permission, RpcMethod, ServerError}, +}; use anyhow::Result; use chrono::Duration; use enumflags2::BitFlags; @@ -14,6 +17,18 @@ use serde_with::{DurationSeconds, serde_as}; /// RPC call to create a new JWT Token pub enum AuthNew {} + +impl AuthNew { + pub fn create_token( + keystore: &KeyStore, + token_exp: Duration, + permissions: Vec, + ) -> anyhow::Result { + let ki = keystore.get(JWT_IDENTIFIER)?; + Ok(create_token(permissions, ki.private_key(), token_exp)?) + } +} + impl RpcMethod<2> for AuthNew { const NAME: &'static str = "Filecoin.AuthNew"; const N_REQUIRED_PARAMS: usize = 1; @@ -28,14 +43,13 @@ impl RpcMethod<2> for AuthNew { (permissions, expiration_secs): Self::Params, ) -> Result { let ks = ctx.keystore.read(); - let ki = ks.get(JWT_IDENTIFIER)?; // Lotus admin tokens do not expire but Forest requires all JWT tokens to // have an expiration date. So we set the expiration date to 100 years in // the future to match user-visible behavior of Lotus. let token_exp = expiration_secs .map(chrono::Duration::seconds) .unwrap_or_else(|| chrono::Duration::days(365 * 100)); - let token = create_token(permissions, ki.private_key(), token_exp)?; + let token = Self::create_token(&ks, token_exp, permissions)?; Ok(token.as_bytes().to_vec()) } } diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index e4348967a92..bfc702e8634 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -437,11 +437,10 @@ use fvm_ipld_blockstore::Blockstore; use jsonrpsee::{ Methods, core::middleware::RpcServiceBuilder, - server::{RpcModule, Server, StopHandle, TowerServiceBuilder, stop_channel}, + server::{RpcModule, Server, StopHandle, TowerServiceBuilder}, }; use parking_lot::RwLock; use std::env; -use std::net::SocketAddr; use std::sync::{Arc, LazyLock}; use std::time::Duration; use tokio::sync::mpsc; @@ -523,7 +522,8 @@ struct PerConnection { pub async fn start_rpc( state: RPCState, - rpc_endpoint: SocketAddr, + rpc_listener: tokio::net::TcpListener, + stop_handle: StopHandle, filter_list: Option, ) -> anyhow::Result<()> where @@ -550,8 +550,6 @@ where let methods: Arc> = Arc::new(modules.into_iter().map(|(k, v)| (k, v.into())).collect()); - let (stop_handle, _server_handle) = stop_channel(); - let per_conn = PerConnection { stop_handle: stop_handle.clone(), svc_builder: Server::builder() @@ -574,12 +572,10 @@ where .to_service_builder(), keystore, }; - - let listener = tokio::net::TcpListener::bind(rpc_endpoint).await.unwrap(); tracing::info!("Ready for RPC connections"); loop { let sock = tokio::select! { - res = listener.accept() => { + res = rpc_listener.accept() => { match res { Ok((stream, _remote_addr)) => stream, Err(e) => { @@ -781,7 +777,13 @@ pub fn openrpc(path: ApiPaths, include: Option<&[&str]>) -> openrpc_types::OpenR #[cfg(test)] mod tests { - use crate::rpc::ApiPaths; + use super::*; + use crate::{ + db::MemoryDB, networks::NetworkChain, rpc::common::ShiftingVersion, + tool::offline_server::server::offline_rpc_state, + }; + use jsonrpsee::server::stop_channel; + use std::net::{Ipv4Addr, SocketAddr}; // `cargo test --lib -- --exact 'rpc::tests::openrpc'` // `cargo insta review` @@ -800,4 +802,59 @@ mod tests { insta::assert_yaml_snapshot!(_spec); } } + + #[tokio::test(flavor = "multi_thread")] + async fn test_rpc_server() { + let chain = NetworkChain::Calibnet; + let db = Arc::new(MemoryDB::default()); + let (state, mut shutdown_recv) = offline_rpc_state(chain, db, None, None).await.unwrap(); + let block_delay_secs = state.chain_config().block_delay_secs; + let shutdown_send = state.shutdown.clone(); + let jwt_read_permissions = vec!["read".to_owned()]; + let jwt_read = super::methods::auth::AuthNew::create_token( + &state.keystore.read(), + chrono::Duration::hours(1), + jwt_read_permissions.clone(), + ) + .unwrap(); + let rpc_listener = + tokio::net::TcpListener::bind(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0)) + .await + .unwrap(); + let rpc_address = rpc_listener.local_addr().unwrap(); + let (stop_handle, server_handle) = stop_channel(); + + // Start an RPC server + + let handle = tokio::spawn(start_rpc(state, rpc_listener, stop_handle, None)); + + // Send a few requests + + let client = Client::from_url( + format!("http://{}:{}/", rpc_address.ip(), rpc_address.port()) + .parse() + .unwrap(), + ); + + let response = super::methods::common::Version::call(&client, ()) + .await + .unwrap(); + assert_eq!( + &response.version, + &*crate::utils::version::FOREST_VERSION_STRING + ); + assert_eq!(response.block_delay, block_delay_secs); + assert_eq!(response.api_version, ShiftingVersion::new(2, 3, 0)); + + let response = super::methods::auth::AuthVerify::call(&client, (jwt_read,)) + .await + .unwrap(); + assert_eq!(response, jwt_read_permissions); + + // Gracefully shutdown the RPC server + shutdown_send.send(()).await.unwrap(); + shutdown_recv.recv().await; + server_handle.stop().unwrap(); + handle.await.unwrap().unwrap(); + } } diff --git a/src/tool/mod.rs b/src/tool/mod.rs index 2112243ebe0..191cd338e55 100644 --- a/src/tool/mod.rs +++ b/src/tool/mod.rs @@ -1,5 +1,5 @@ // Copyright 2019-2026 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT pub mod main; -mod offline_server; +pub mod offline_server; pub mod subcommands; diff --git a/src/tool/offline_server/server.rs b/src/tool/offline_server/server.rs index 47f5ae30fb1..a8bf2927273 100644 --- a/src/tool/offline_server/server.rs +++ b/src/tool/offline_server/server.rs @@ -9,7 +9,9 @@ use crate::cli_shared::cli::EventsConfig; use crate::cli_shared::snapshot::TrustedVendor; use crate::daemon::db_util::RangeSpec; use crate::daemon::db_util::backfill_db; -use crate::db::{MemoryDB, car::ManyCar}; +use crate::db::{ + EthMappingsStore, HeaviestTipsetKeyProvider, MemoryDB, SettingsStore, car::ManyCar, +}; use crate::genesis::read_genesis_header; use crate::key_management::{KeyStore, KeyStoreConfig}; use crate::libp2p::PeerManager; @@ -24,11 +26,12 @@ use crate::utils::proofs_api::{self, ensure_proof_params_downloaded}; use crate::{Config, JWT_IDENTIFIER}; use anyhow::Context as _; use fvm_ipld_blockstore::Blockstore; +use jsonrpsee::server::stop_channel; use parking_lot::RwLock; -use std::mem::discriminant; use std::{ + mem::discriminant, net::{IpAddr, Ipv4Addr, SocketAddr}, - path::PathBuf, + path::{Path, PathBuf}, sync::Arc, }; use tokio::{ @@ -41,6 +44,89 @@ use tokio::{ }; use tracing::{info, warn}; +pub async fn offline_rpc_state( + chain: NetworkChain, + db: Arc, + genesis_fp: Option<&Path>, + save_jwt_token: Option<&Path>, +) -> anyhow::Result<(RPCState, mpsc::Receiver<()>)> +where + DB: Blockstore + + SettingsStore + + HeaviestTipsetKeyProvider + + EthMappingsStore + + Send + + Sync + + 'static, +{ + let chain_config = Arc::new(handle_chain_config(&chain)?); + let events_config = Arc::new(EventsConfig::default()); + let genesis_header = read_genesis_header( + genesis_fp, + chain_config.genesis_bytes(&db).await?.as_deref(), + &db, + ) + .await?; + // let head_ts = db.heaviest_tipset()?; + let chain_store = Arc::new(ChainStore::new( + db.clone(), + db.clone(), + db.clone(), + chain_config, + genesis_header.clone(), + )?); + let state_manager = Arc::new(StateManager::new(chain_store.clone())?); + let (network_send, _) = flume::bounded(5); + let (tipset_send, _) = flume::bounded(5); + let message_pool = MessagePool::new( + MpoolRpcProvider::new(chain_store.publisher().clone(), state_manager.clone()), + network_send.clone(), + Default::default(), + state_manager.chain_config().clone(), + &mut JoinSet::new(), + )?; + let (shutdown, shutdown_recv) = mpsc::channel(1); + + let mut keystore = KeyStore::new(KeyStoreConfig::Memory)?; + keystore.put(JWT_IDENTIFIER, generate_priv_key())?; + let ki = keystore.get(JWT_IDENTIFIER)?; + // Lotus admin tokens do not expire but Forest requires all JWT tokens to + // have an expiration date. So we set the expiration date to 100 years in + // the future to match user-visible behavior of Lotus. + let token_exp = chrono::Duration::days(365 * 100); + let token = crate::auth::create_token( + crate::auth::ADMIN.iter().map(ToString::to_string).collect(), + ki.private_key(), + token_exp, + )?; + info!("Admin token: {token}"); + if let Some(path) = save_jwt_token { + std::fs::write(path, token)?; + } + + let peer_manager = Arc::new(PeerManager::default()); + let sync_network_context = + SyncNetworkContext::new(network_send, peer_manager, state_manager.blockstore_owned()); + + Ok(( + RPCState { + state_manager, + keystore: Arc::new(RwLock::new(keystore)), + mpool: Arc::new(message_pool), + bad_blocks: Default::default(), + msgs_in_tipset: Default::default(), + sync_status: Arc::new(RwLock::new(SyncStatusReport::init())), + eth_event_handler: Arc::new(EthEventHandler::from_config(&events_config)), + sync_network_context, + start_time: chrono::Utc::now(), + shutdown, + tipset_send, + snapshot_progress_tracker: Default::default(), + }, + shutdown_recv, + )) +} + #[allow(clippy::too_many_arguments)] pub async fn start_offline_server( snapshot_files: Vec, @@ -54,17 +140,23 @@ pub async fn start_offline_server( ) -> anyhow::Result<()> { info!("Configuring Offline RPC Server"); - let db = Arc::new(ManyCar::new(MemoryDB::default())); - - let snapshot_files = handle_snapshots( - snapshot_files, - chain.as_ref(), - auto_download_snapshot, - genesis.clone(), - ) - .await?; + // Set proof parameter data dir and make sure the proofs are available. Otherwise, + // validation might fail due to missing proof parameters. + proofs_api::maybe_set_proofs_parameter_cache_dir_env(&Config::default().client.data_dir); + ensure_proof_params_downloaded().await?; - db.read_only_files(snapshot_files.iter().cloned())?; + let db = { + let db = Arc::new(ManyCar::new(MemoryDB::default())); + let snapshot_files = handle_snapshots( + snapshot_files, + chain.as_ref(), + auto_download_snapshot, + genesis.clone(), + ) + .await?; + db.read_only_files(snapshot_files.iter().cloned())?; + db + }; let inferred_chain = { let head = db.heaviest_tipset()?; @@ -80,49 +172,20 @@ pub async fn start_offline_server( } else { inferred_chain }; + let (rpc_state, shutdown_recv) = + offline_rpc_state(chain, db, genesis.as_deref(), save_jwt_token.as_deref()).await?; - let chain_config = Arc::new(handle_chain_config(&chain)?); - let events_config = Arc::new(EventsConfig::default()); - let genesis_header = read_genesis_header( - genesis.as_deref(), - chain_config.genesis_bytes(&db).await?.as_deref(), - &db, - ) - .await?; - let head_ts = db.heaviest_tipset()?; - let chain_store = Arc::new(ChainStore::new( - db.clone(), - db.clone(), - db.clone(), - chain_config, - genesis_header.clone(), - )?); - let state_manager = Arc::new(StateManager::new(chain_store.clone())?); - - // Set proof parameter data dir and make sure the proofs are available. Otherwise, - // validation might fail due to missing proof parameters. - proofs_api::maybe_set_proofs_parameter_cache_dir_env(&Config::default().client.data_dir); - ensure_proof_params_downloaded().await?; - + let state_manager = &rpc_state.state_manager; + let head_ts = state_manager.heaviest_tipset(); if index_backfill_epochs > 0 { backfill_db( - &state_manager, + state_manager, &head_ts, RangeSpec::NumTipsets(index_backfill_epochs), ) .await?; } - let (network_send, _) = flume::bounded(5); - let (tipset_send, _) = flume::bounded(5); - let message_pool: MessagePool> = MessagePool::new( - MpoolRpcProvider::new(chain_store.publisher().clone(), state_manager.clone()), - network_send.clone(), - Default::default(), - state_manager.chain_config().clone(), - &mut JoinSet::new(), - )?; - // Validate tipsets since the {height} EPOCH when `height >= 0`, // or valiadte the last {-height} EPOCH(s) when `height < 0` let validate_until_epoch = if height > 0 { @@ -133,48 +196,11 @@ pub async fn start_offline_server( if validate_until_epoch <= head_ts.epoch() { state_manager.validate_tipsets( head_ts - .chain(&db) + .chain(rpc_state.store()) .take_while(|ts| ts.epoch() >= validate_until_epoch), )?; } - let (shutdown, shutdown_recv) = mpsc::channel(1); - - let mut keystore = KeyStore::new(KeyStoreConfig::Memory)?; - keystore.put(JWT_IDENTIFIER, generate_priv_key())?; - let ki = keystore.get(JWT_IDENTIFIER)?; - // Lotus admin tokens do not expire but Forest requires all JWT tokens to - // have an expiration date. So we set the expiration date to 100 years in - // the future to match user-visible behavior of Lotus. - let token_exp = chrono::Duration::days(365 * 100); - let token = crate::auth::create_token( - crate::auth::ADMIN.iter().map(ToString::to_string).collect(), - ki.private_key(), - token_exp, - )?; - info!("Admin token: {token}"); - if let Some(path) = save_jwt_token { - std::fs::write(path, token)?; - } - - let peer_manager = Arc::new(PeerManager::default()); - let sync_network_context = - SyncNetworkContext::new(network_send, peer_manager, state_manager.blockstore_owned()); - - let rpc_state = RPCState { - state_manager, - keystore: Arc::new(RwLock::new(keystore)), - mpool: Arc::new(message_pool), - bad_blocks: Default::default(), - msgs_in_tipset: Default::default(), - sync_status: Arc::new(RwLock::new(SyncStatusReport::init())), - eth_event_handler: Arc::new(EthEventHandler::from_config(&events_config)), - sync_network_context, - start_time: chrono::Utc::now(), - shutdown, - tipset_send, - snapshot_progress_tracker: Default::default(), - }; start_offline_rpc(rpc_state, rpc_port, shutdown_recv).await?; Ok(()) @@ -190,10 +216,11 @@ where { info!("Starting offline RPC Server"); let rpc_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), rpc_port); + let rpc_listener = tokio::net::TcpListener::bind(rpc_address).await?; let mut terminate = signal(SignalKind::terminate())?; - + let (stop_handle, server_handle) = stop_channel(); let result = tokio::select! { - ret = start_rpc(state, rpc_address, None) => ret, + _ = start_rpc(state, rpc_listener,stop_handle, None) => Ok(()), _ = ctrl_c() => { info!("Keyboard interrupt."); Ok(()) @@ -207,6 +234,9 @@ where Ok(()) }, }; + if let Err(e) = server_handle.stop() { + tracing::warn!("{e}"); + } crate::utils::io::terminal_cleanup(); result } From 3c1c6089d75aa7255bb4e858cc784c8398b8f452 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Wed, 28 Jan 2026 23:03:31 +0800 Subject: [PATCH 2/6] test: unit test for `start_rpc` --- src/daemon/mod.rs | 20 ++- src/rpc/methods/auth.rs | 24 +++- src/rpc/mod.rs | 75 +++++++++-- src/tool/mod.rs | 2 +- src/tool/offline_server/server.rs | 204 +++++++++++++++++------------- 5 files changed, 220 insertions(+), 105 deletions(-) diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 5b65cce635c..c753a239aed 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -80,7 +80,6 @@ pub async fn start_interruptable(opts: CliOpts, config: Config) -> anyhow::Resul let start_time = chrono::Utc::now(); let mut terminate = signal(SignalKind::terminate())?; let (shutdown_send, mut shutdown_recv) = mpsc::channel(1); - let result = tokio::select! { ret = start(start_time, opts, config, shutdown_send) => ret, _ = ctrl_c() => { @@ -375,6 +374,7 @@ fn maybe_start_rpc_service( chain_follower: &ChainFollower, start_time: chrono::DateTime, shutdown: mpsc::Sender<()>, + rpc_stop_handle: jsonrpsee::server::StopHandle, ctx: &AppContext, ) -> anyhow::Result<()> { if config.client.enable_rpc { @@ -402,6 +402,12 @@ fn maybe_start_rpc_service( let snapshot_progress_tracker = ctx.snapshot_progress_tracker.clone(); let msgs_in_tipset = Arc::new(crate::chain::MsgsInTipsetCache::default()); async move { + let rpc_listener = tokio::net::TcpListener::bind(rpc_address) + .await + .map_err(|e| { + anyhow::anyhow!("Unable to listen on RPC endpoint {rpc_address}: {e}") + }) + .unwrap(); start_rpc( RPCState { state_manager, @@ -417,7 +423,8 @@ fn maybe_start_rpc_service( tipset_send, snapshot_progress_tracker, }, - rpc_address, + rpc_listener, + rpc_stop_handle, filter_list, ) .await @@ -559,11 +566,16 @@ pub(super) async fn start( }); } loop { + let (rpc_stop_handle, rpc_server_handle) = jsonrpsee::server::stop_channel(); tokio::select! { _ = snap_gc_reboot_rx.recv_async() => { + // gracefully shutdown RPC server + if let Err(e) = rpc_server_handle.stop() { + tracing::warn!("failed to stop RPC server: {e}"); + } snap_gc.cleanup_before_reboot().await; } - result = start_services(start_time, &opts, config.clone(), shutdown_send.clone(), |ctx, sync_status| { + result = start_services(start_time, &opts, config.clone(), shutdown_send.clone(), rpc_stop_handle, |ctx, sync_status| { snap_gc.set_db(ctx.db.clone()); snap_gc.set_sync_status(sync_status); snap_gc.set_car_db_head_epoch(ctx.db.heaviest_tipset().map(|ts|ts.epoch()).unwrap_or_default()); @@ -579,6 +591,7 @@ pub(super) async fn start_services( opts: &CliOpts, mut config: Config, shutdown_send: mpsc::Sender<()>, + rpc_stop_handle: jsonrpsee::server::StopHandle, on_app_context_and_db_initialized: impl FnOnce(&AppContext, SyncStatus), ) -> anyhow::Result<()> { // Cleanup the collector prometheus metrics registry on start @@ -608,6 +621,7 @@ pub(super) async fn start_services( &chain_follower, start_time, shutdown_send.clone(), + rpc_stop_handle, &ctx, )?; diff --git a/src/rpc/methods/auth.rs b/src/rpc/methods/auth.rs index 6ffb56181d4..faec5406324 100644 --- a/src/rpc/methods/auth.rs +++ b/src/rpc/methods/auth.rs @@ -1,9 +1,12 @@ // Copyright 2019-2026 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use crate::auth::*; -use crate::lotus_json::lotus_json_with_self; -use crate::rpc::{ApiPaths, Ctx, Permission, RpcMethod, ServerError}; +use crate::{ + KeyStore, + auth::*, + lotus_json::lotus_json_with_self, + rpc::{ApiPaths, Ctx, Permission, RpcMethod, ServerError}, +}; use anyhow::Result; use chrono::Duration; use enumflags2::BitFlags; @@ -14,6 +17,18 @@ use serde_with::{DurationSeconds, serde_as}; /// RPC call to create a new JWT Token pub enum AuthNew {} + +impl AuthNew { + pub fn create_token( + keystore: &KeyStore, + token_exp: Duration, + permissions: Vec, + ) -> anyhow::Result { + let ki = keystore.get(JWT_IDENTIFIER)?; + Ok(create_token(permissions, ki.private_key(), token_exp)?) + } +} + impl RpcMethod<2> for AuthNew { const NAME: &'static str = "Filecoin.AuthNew"; const N_REQUIRED_PARAMS: usize = 1; @@ -28,14 +43,13 @@ impl RpcMethod<2> for AuthNew { (permissions, expiration_secs): Self::Params, ) -> Result { let ks = ctx.keystore.read(); - let ki = ks.get(JWT_IDENTIFIER)?; // Lotus admin tokens do not expire but Forest requires all JWT tokens to // have an expiration date. So we set the expiration date to 100 years in // the future to match user-visible behavior of Lotus. let token_exp = expiration_secs .map(chrono::Duration::seconds) .unwrap_or_else(|| chrono::Duration::days(365 * 100)); - let token = create_token(permissions, ki.private_key(), token_exp)?; + let token = Self::create_token(&ks, token_exp, permissions)?; Ok(token.as_bytes().to_vec()) } } diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index e4348967a92..bfc702e8634 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -437,11 +437,10 @@ use fvm_ipld_blockstore::Blockstore; use jsonrpsee::{ Methods, core::middleware::RpcServiceBuilder, - server::{RpcModule, Server, StopHandle, TowerServiceBuilder, stop_channel}, + server::{RpcModule, Server, StopHandle, TowerServiceBuilder}, }; use parking_lot::RwLock; use std::env; -use std::net::SocketAddr; use std::sync::{Arc, LazyLock}; use std::time::Duration; use tokio::sync::mpsc; @@ -523,7 +522,8 @@ struct PerConnection { pub async fn start_rpc( state: RPCState, - rpc_endpoint: SocketAddr, + rpc_listener: tokio::net::TcpListener, + stop_handle: StopHandle, filter_list: Option, ) -> anyhow::Result<()> where @@ -550,8 +550,6 @@ where let methods: Arc> = Arc::new(modules.into_iter().map(|(k, v)| (k, v.into())).collect()); - let (stop_handle, _server_handle) = stop_channel(); - let per_conn = PerConnection { stop_handle: stop_handle.clone(), svc_builder: Server::builder() @@ -574,12 +572,10 @@ where .to_service_builder(), keystore, }; - - let listener = tokio::net::TcpListener::bind(rpc_endpoint).await.unwrap(); tracing::info!("Ready for RPC connections"); loop { let sock = tokio::select! { - res = listener.accept() => { + res = rpc_listener.accept() => { match res { Ok((stream, _remote_addr)) => stream, Err(e) => { @@ -781,7 +777,13 @@ pub fn openrpc(path: ApiPaths, include: Option<&[&str]>) -> openrpc_types::OpenR #[cfg(test)] mod tests { - use crate::rpc::ApiPaths; + use super::*; + use crate::{ + db::MemoryDB, networks::NetworkChain, rpc::common::ShiftingVersion, + tool::offline_server::server::offline_rpc_state, + }; + use jsonrpsee::server::stop_channel; + use std::net::{Ipv4Addr, SocketAddr}; // `cargo test --lib -- --exact 'rpc::tests::openrpc'` // `cargo insta review` @@ -800,4 +802,59 @@ mod tests { insta::assert_yaml_snapshot!(_spec); } } + + #[tokio::test(flavor = "multi_thread")] + async fn test_rpc_server() { + let chain = NetworkChain::Calibnet; + let db = Arc::new(MemoryDB::default()); + let (state, mut shutdown_recv) = offline_rpc_state(chain, db, None, None).await.unwrap(); + let block_delay_secs = state.chain_config().block_delay_secs; + let shutdown_send = state.shutdown.clone(); + let jwt_read_permissions = vec!["read".to_owned()]; + let jwt_read = super::methods::auth::AuthNew::create_token( + &state.keystore.read(), + chrono::Duration::hours(1), + jwt_read_permissions.clone(), + ) + .unwrap(); + let rpc_listener = + tokio::net::TcpListener::bind(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0)) + .await + .unwrap(); + let rpc_address = rpc_listener.local_addr().unwrap(); + let (stop_handle, server_handle) = stop_channel(); + + // Start an RPC server + + let handle = tokio::spawn(start_rpc(state, rpc_listener, stop_handle, None)); + + // Send a few requests + + let client = Client::from_url( + format!("http://{}:{}/", rpc_address.ip(), rpc_address.port()) + .parse() + .unwrap(), + ); + + let response = super::methods::common::Version::call(&client, ()) + .await + .unwrap(); + assert_eq!( + &response.version, + &*crate::utils::version::FOREST_VERSION_STRING + ); + assert_eq!(response.block_delay, block_delay_secs); + assert_eq!(response.api_version, ShiftingVersion::new(2, 3, 0)); + + let response = super::methods::auth::AuthVerify::call(&client, (jwt_read,)) + .await + .unwrap(); + assert_eq!(response, jwt_read_permissions); + + // Gracefully shutdown the RPC server + shutdown_send.send(()).await.unwrap(); + shutdown_recv.recv().await; + server_handle.stop().unwrap(); + handle.await.unwrap().unwrap(); + } } diff --git a/src/tool/mod.rs b/src/tool/mod.rs index 2112243ebe0..191cd338e55 100644 --- a/src/tool/mod.rs +++ b/src/tool/mod.rs @@ -1,5 +1,5 @@ // Copyright 2019-2026 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT pub mod main; -mod offline_server; +pub mod offline_server; pub mod subcommands; diff --git a/src/tool/offline_server/server.rs b/src/tool/offline_server/server.rs index 47f5ae30fb1..a8bf2927273 100644 --- a/src/tool/offline_server/server.rs +++ b/src/tool/offline_server/server.rs @@ -9,7 +9,9 @@ use crate::cli_shared::cli::EventsConfig; use crate::cli_shared::snapshot::TrustedVendor; use crate::daemon::db_util::RangeSpec; use crate::daemon::db_util::backfill_db; -use crate::db::{MemoryDB, car::ManyCar}; +use crate::db::{ + EthMappingsStore, HeaviestTipsetKeyProvider, MemoryDB, SettingsStore, car::ManyCar, +}; use crate::genesis::read_genesis_header; use crate::key_management::{KeyStore, KeyStoreConfig}; use crate::libp2p::PeerManager; @@ -24,11 +26,12 @@ use crate::utils::proofs_api::{self, ensure_proof_params_downloaded}; use crate::{Config, JWT_IDENTIFIER}; use anyhow::Context as _; use fvm_ipld_blockstore::Blockstore; +use jsonrpsee::server::stop_channel; use parking_lot::RwLock; -use std::mem::discriminant; use std::{ + mem::discriminant, net::{IpAddr, Ipv4Addr, SocketAddr}, - path::PathBuf, + path::{Path, PathBuf}, sync::Arc, }; use tokio::{ @@ -41,6 +44,89 @@ use tokio::{ }; use tracing::{info, warn}; +pub async fn offline_rpc_state( + chain: NetworkChain, + db: Arc, + genesis_fp: Option<&Path>, + save_jwt_token: Option<&Path>, +) -> anyhow::Result<(RPCState, mpsc::Receiver<()>)> +where + DB: Blockstore + + SettingsStore + + HeaviestTipsetKeyProvider + + EthMappingsStore + + Send + + Sync + + 'static, +{ + let chain_config = Arc::new(handle_chain_config(&chain)?); + let events_config = Arc::new(EventsConfig::default()); + let genesis_header = read_genesis_header( + genesis_fp, + chain_config.genesis_bytes(&db).await?.as_deref(), + &db, + ) + .await?; + // let head_ts = db.heaviest_tipset()?; + let chain_store = Arc::new(ChainStore::new( + db.clone(), + db.clone(), + db.clone(), + chain_config, + genesis_header.clone(), + )?); + let state_manager = Arc::new(StateManager::new(chain_store.clone())?); + let (network_send, _) = flume::bounded(5); + let (tipset_send, _) = flume::bounded(5); + let message_pool = MessagePool::new( + MpoolRpcProvider::new(chain_store.publisher().clone(), state_manager.clone()), + network_send.clone(), + Default::default(), + state_manager.chain_config().clone(), + &mut JoinSet::new(), + )?; + let (shutdown, shutdown_recv) = mpsc::channel(1); + + let mut keystore = KeyStore::new(KeyStoreConfig::Memory)?; + keystore.put(JWT_IDENTIFIER, generate_priv_key())?; + let ki = keystore.get(JWT_IDENTIFIER)?; + // Lotus admin tokens do not expire but Forest requires all JWT tokens to + // have an expiration date. So we set the expiration date to 100 years in + // the future to match user-visible behavior of Lotus. + let token_exp = chrono::Duration::days(365 * 100); + let token = crate::auth::create_token( + crate::auth::ADMIN.iter().map(ToString::to_string).collect(), + ki.private_key(), + token_exp, + )?; + info!("Admin token: {token}"); + if let Some(path) = save_jwt_token { + std::fs::write(path, token)?; + } + + let peer_manager = Arc::new(PeerManager::default()); + let sync_network_context = + SyncNetworkContext::new(network_send, peer_manager, state_manager.blockstore_owned()); + + Ok(( + RPCState { + state_manager, + keystore: Arc::new(RwLock::new(keystore)), + mpool: Arc::new(message_pool), + bad_blocks: Default::default(), + msgs_in_tipset: Default::default(), + sync_status: Arc::new(RwLock::new(SyncStatusReport::init())), + eth_event_handler: Arc::new(EthEventHandler::from_config(&events_config)), + sync_network_context, + start_time: chrono::Utc::now(), + shutdown, + tipset_send, + snapshot_progress_tracker: Default::default(), + }, + shutdown_recv, + )) +} + #[allow(clippy::too_many_arguments)] pub async fn start_offline_server( snapshot_files: Vec, @@ -54,17 +140,23 @@ pub async fn start_offline_server( ) -> anyhow::Result<()> { info!("Configuring Offline RPC Server"); - let db = Arc::new(ManyCar::new(MemoryDB::default())); - - let snapshot_files = handle_snapshots( - snapshot_files, - chain.as_ref(), - auto_download_snapshot, - genesis.clone(), - ) - .await?; + // Set proof parameter data dir and make sure the proofs are available. Otherwise, + // validation might fail due to missing proof parameters. + proofs_api::maybe_set_proofs_parameter_cache_dir_env(&Config::default().client.data_dir); + ensure_proof_params_downloaded().await?; - db.read_only_files(snapshot_files.iter().cloned())?; + let db = { + let db = Arc::new(ManyCar::new(MemoryDB::default())); + let snapshot_files = handle_snapshots( + snapshot_files, + chain.as_ref(), + auto_download_snapshot, + genesis.clone(), + ) + .await?; + db.read_only_files(snapshot_files.iter().cloned())?; + db + }; let inferred_chain = { let head = db.heaviest_tipset()?; @@ -80,49 +172,20 @@ pub async fn start_offline_server( } else { inferred_chain }; + let (rpc_state, shutdown_recv) = + offline_rpc_state(chain, db, genesis.as_deref(), save_jwt_token.as_deref()).await?; - let chain_config = Arc::new(handle_chain_config(&chain)?); - let events_config = Arc::new(EventsConfig::default()); - let genesis_header = read_genesis_header( - genesis.as_deref(), - chain_config.genesis_bytes(&db).await?.as_deref(), - &db, - ) - .await?; - let head_ts = db.heaviest_tipset()?; - let chain_store = Arc::new(ChainStore::new( - db.clone(), - db.clone(), - db.clone(), - chain_config, - genesis_header.clone(), - )?); - let state_manager = Arc::new(StateManager::new(chain_store.clone())?); - - // Set proof parameter data dir and make sure the proofs are available. Otherwise, - // validation might fail due to missing proof parameters. - proofs_api::maybe_set_proofs_parameter_cache_dir_env(&Config::default().client.data_dir); - ensure_proof_params_downloaded().await?; - + let state_manager = &rpc_state.state_manager; + let head_ts = state_manager.heaviest_tipset(); if index_backfill_epochs > 0 { backfill_db( - &state_manager, + state_manager, &head_ts, RangeSpec::NumTipsets(index_backfill_epochs), ) .await?; } - let (network_send, _) = flume::bounded(5); - let (tipset_send, _) = flume::bounded(5); - let message_pool: MessagePool> = MessagePool::new( - MpoolRpcProvider::new(chain_store.publisher().clone(), state_manager.clone()), - network_send.clone(), - Default::default(), - state_manager.chain_config().clone(), - &mut JoinSet::new(), - )?; - // Validate tipsets since the {height} EPOCH when `height >= 0`, // or valiadte the last {-height} EPOCH(s) when `height < 0` let validate_until_epoch = if height > 0 { @@ -133,48 +196,11 @@ pub async fn start_offline_server( if validate_until_epoch <= head_ts.epoch() { state_manager.validate_tipsets( head_ts - .chain(&db) + .chain(rpc_state.store()) .take_while(|ts| ts.epoch() >= validate_until_epoch), )?; } - let (shutdown, shutdown_recv) = mpsc::channel(1); - - let mut keystore = KeyStore::new(KeyStoreConfig::Memory)?; - keystore.put(JWT_IDENTIFIER, generate_priv_key())?; - let ki = keystore.get(JWT_IDENTIFIER)?; - // Lotus admin tokens do not expire but Forest requires all JWT tokens to - // have an expiration date. So we set the expiration date to 100 years in - // the future to match user-visible behavior of Lotus. - let token_exp = chrono::Duration::days(365 * 100); - let token = crate::auth::create_token( - crate::auth::ADMIN.iter().map(ToString::to_string).collect(), - ki.private_key(), - token_exp, - )?; - info!("Admin token: {token}"); - if let Some(path) = save_jwt_token { - std::fs::write(path, token)?; - } - - let peer_manager = Arc::new(PeerManager::default()); - let sync_network_context = - SyncNetworkContext::new(network_send, peer_manager, state_manager.blockstore_owned()); - - let rpc_state = RPCState { - state_manager, - keystore: Arc::new(RwLock::new(keystore)), - mpool: Arc::new(message_pool), - bad_blocks: Default::default(), - msgs_in_tipset: Default::default(), - sync_status: Arc::new(RwLock::new(SyncStatusReport::init())), - eth_event_handler: Arc::new(EthEventHandler::from_config(&events_config)), - sync_network_context, - start_time: chrono::Utc::now(), - shutdown, - tipset_send, - snapshot_progress_tracker: Default::default(), - }; start_offline_rpc(rpc_state, rpc_port, shutdown_recv).await?; Ok(()) @@ -190,10 +216,11 @@ where { info!("Starting offline RPC Server"); let rpc_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), rpc_port); + let rpc_listener = tokio::net::TcpListener::bind(rpc_address).await?; let mut terminate = signal(SignalKind::terminate())?; - + let (stop_handle, server_handle) = stop_channel(); let result = tokio::select! { - ret = start_rpc(state, rpc_address, None) => ret, + _ = start_rpc(state, rpc_listener,stop_handle, None) => Ok(()), _ = ctrl_c() => { info!("Keyboard interrupt."); Ok(()) @@ -207,6 +234,9 @@ where Ok(()) }, }; + if let Err(e) = server_handle.stop() { + tracing::warn!("{e}"); + } crate::utils::io::terminal_cleanup(); result } From e16f394dc17f16b453c459c7143ba37520066732 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Wed, 28 Jan 2026 23:58:48 +0800 Subject: [PATCH 3/6] resolve AI comment --- src/tool/offline_server/server.rs | 6 +++++- src/tool/subcommands/api_cmd/generate_test_snapshot.rs | 5 ++++- src/tool/subcommands/api_cmd/test_snapshot.rs | 5 ++++- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/src/tool/offline_server/server.rs b/src/tool/offline_server/server.rs index a8bf2927273..5d581912e1f 100644 --- a/src/tool/offline_server/server.rs +++ b/src/tool/offline_server/server.rs @@ -78,13 +78,17 @@ where let state_manager = Arc::new(StateManager::new(chain_store.clone())?); let (network_send, _) = flume::bounded(5); let (tipset_send, _) = flume::bounded(5); + + let mut services = JoinSet::new(); let message_pool = MessagePool::new( MpoolRpcProvider::new(chain_store.publisher().clone(), state_manager.clone()), network_send.clone(), Default::default(), state_manager.chain_config().clone(), - &mut JoinSet::new(), + &mut services, )?; + tokio::spawn(async move { while services.join_next().await.is_some() {} }); + let (shutdown, shutdown_recv) = mpsc::channel(1); let mut keystore = KeyStore::new(KeyStoreConfig::Memory)?; diff --git a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs index 46eb1baa09a..823c735a026 100644 --- a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs @@ -122,13 +122,16 @@ async fn ctx( ); let state_manager = Arc::new(StateManager::new(chain_store.clone()).unwrap()); + + let mut services = JoinSet::new(); let message_pool = MessagePool::new( MpoolRpcProvider::new(chain_store.publisher().clone(), state_manager.clone()), network_send.clone(), Default::default(), state_manager.chain_config().clone(), - &mut JoinSet::new(), + &mut services, )?; + tokio::spawn(async move { while services.join_next().await.is_some() {} }); let peer_manager = Arc::new(PeerManager::default()); let sync_network_context = diff --git a/src/tool/subcommands/api_cmd/test_snapshot.rs b/src/tool/subcommands/api_cmd/test_snapshot.rs index 80265660b6d..2a3155eb52e 100644 --- a/src/tool/subcommands/api_cmd/test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/test_snapshot.rs @@ -139,13 +139,16 @@ async fn ctx( genesis_header.clone(), )?); let state_manager = Arc::new(StateManager::new(chain_store.clone()).unwrap()); + + let mut services = JoinSet::new(); let message_pool = MessagePool::new( MpoolRpcProvider::new(chain_store.publisher().clone(), state_manager.clone()), network_send.clone(), Default::default(), state_manager.chain_config().clone(), - &mut JoinSet::new(), + &mut services, )?; + tokio::spawn(async move { while services.join_next().await.is_some() {} }); let peer_manager = Arc::new(PeerManager::default()); let sync_network_context = From ad65ad2ec6b2266e49bf98219e6e8463dff8edaf Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 29 Jan 2026 00:17:31 +0800 Subject: [PATCH 4/6] fix --- src/tool/offline_server/server.rs | 16 ++++++++++++++-- .../api_cmd/generate_test_snapshot.rs | 14 +++++++++++++- src/tool/subcommands/api_cmd/test_snapshot.rs | 14 +++++++++++++- 3 files changed, 40 insertions(+), 4 deletions(-) diff --git a/src/tool/offline_server/server.rs b/src/tool/offline_server/server.rs index 5d581912e1f..03de766f8db 100644 --- a/src/tool/offline_server/server.rs +++ b/src/tool/offline_server/server.rs @@ -87,7 +87,19 @@ where state_manager.chain_config().clone(), &mut services, )?; - tokio::spawn(async move { while services.join_next().await.is_some() {} }); + tokio::spawn(async move { + while let Some(s) = services.join_next().await { + match s { + Ok(Ok(())) => {} + Ok(Err(e)) => { + tracing::warn!("{e}") + } + Err(e) => { + tracing::warn!("{e}") + } + } + } + }); let (shutdown, shutdown_recv) = mpsc::channel(1); @@ -224,7 +236,7 @@ where let mut terminate = signal(SignalKind::terminate())?; let (stop_handle, server_handle) = stop_channel(); let result = tokio::select! { - _ = start_rpc(state, rpc_listener,stop_handle, None) => Ok(()), + ret = start_rpc(state, rpc_listener,stop_handle, None) => ret, _ = ctrl_c() => { info!("Keyboard interrupt."); Ok(()) diff --git a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs index 7c085aecde5..093d5d01e88 100644 --- a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs @@ -138,7 +138,19 @@ async fn ctx( state_manager.chain_config().clone(), &mut services, )?; - tokio::spawn(async move { while services.join_next().await.is_some() {} }); + tokio::spawn(async move { + while let Some(s) = services.join_next().await { + match s { + Ok(Ok(())) => {} + Ok(Err(e)) => { + tracing::warn!("{e}") + } + Err(e) => { + tracing::warn!("{e}") + } + } + } + }); let peer_manager = Arc::new(PeerManager::default()); let sync_network_context = diff --git a/src/tool/subcommands/api_cmd/test_snapshot.rs b/src/tool/subcommands/api_cmd/test_snapshot.rs index 2a3155eb52e..dd83ca1236d 100644 --- a/src/tool/subcommands/api_cmd/test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/test_snapshot.rs @@ -148,7 +148,19 @@ async fn ctx( state_manager.chain_config().clone(), &mut services, )?; - tokio::spawn(async move { while services.join_next().await.is_some() {} }); + tokio::spawn(async move { + while let Some(s) = services.join_next().await { + match s { + Ok(Ok(())) => {} + Ok(Err(e)) => { + tracing::warn!("{e}") + } + Err(e) => { + tracing::warn!("{e}") + } + } + } + }); let peer_manager = Arc::new(PeerManager::default()); let sync_network_context = From 02219833009719fe4e80a60b45562e7ccc4724cb Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 29 Jan 2026 00:47:22 +0800 Subject: [PATCH 5/6] do not leak admin credentials via logs --- src/daemon/context.rs | 4 +++- src/tool/offline_server/server.rs | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/daemon/context.rs b/src/daemon/context.rs index 3829156135e..f6a60538967 100644 --- a/src/daemon/context.rs +++ b/src/daemon/context.rs @@ -334,12 +334,13 @@ fn handle_admin_token( ki.private_key(), token_exp, )?; - info!("Admin token: {token}"); let default_token_path = config.client.default_rpc_token_path(); if let Err(e) = crate::utils::io::write_new_sensitive_file(token.as_bytes(), &default_token_path) { tracing::warn!("Failed to save the default admin token file: {e}"); + } else { + info!("Admin token is saved to {}", default_token_path.display()); } if let Some(path) = opts.save_token.as_ref() { if let Some(dir) = path.parent() @@ -354,6 +355,7 @@ fn handle_admin_token( } std::fs::write(path, &token) .with_context(|| format!("Failed to save admin token to {}", path.display()))?; + info!("Admin token is saved to {}", path.display()); } Ok(token) diff --git a/src/tool/offline_server/server.rs b/src/tool/offline_server/server.rs index 03de766f8db..33dbb7219d8 100644 --- a/src/tool/offline_server/server.rs +++ b/src/tool/offline_server/server.rs @@ -115,9 +115,11 @@ where ki.private_key(), token_exp, )?; - info!("Admin token: {token}"); if let Some(path) = save_jwt_token { std::fs::write(path, token)?; + info!("Admin token is saved to {}", path.display()); + } else { + info!("Admin token generated. Use --save-token to persist it."); } let peer_manager = Arc::new(PeerManager::default()); From e75c5bbe4d33f65827726b0cfc3d093aa64a1bac Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 29 Jan 2026 01:04:44 +0800 Subject: [PATCH 6/6] resolve AI comments --- src/tool/offline_server/server.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/tool/offline_server/server.rs b/src/tool/offline_server/server.rs index 33dbb7219d8..882726b07ba 100644 --- a/src/tool/offline_server/server.rs +++ b/src/tool/offline_server/server.rs @@ -44,6 +44,8 @@ use tokio::{ }; use tracing::{info, warn}; +/// Builds offline RPC state and returns it with a shutdown receiver. +/// The receiver is notified when RPC shutdown is requested. pub async fn offline_rpc_state( chain: NetworkChain, db: Arc, @@ -116,7 +118,7 @@ where token_exp, )?; if let Some(path) = save_jwt_token { - std::fs::write(path, token)?; + crate::utils::io::write_new_sensitive_file(token.as_bytes(), path)?; info!("Admin token is saved to {}", path.display()); } else { info!("Admin token generated. Use --save-token to persist it.");