Skip to content
4 changes: 3 additions & 1 deletion src/daemon/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,12 +334,13 @@ fn handle_admin_token(
ki.private_key(),
token_exp,
)?;
info!("Admin token: {token}");
let default_token_path = config.client.default_rpc_token_path();
if let Err(e) =
crate::utils::io::write_new_sensitive_file(token.as_bytes(), &default_token_path)
{
tracing::warn!("Failed to save the default admin token file: {e}");
} else {
info!("Admin token is saved to {}", default_token_path.display());
}
if let Some(path) = opts.save_token.as_ref() {
if let Some(dir) = path.parent()
Expand All @@ -354,6 +355,7 @@ fn handle_admin_token(
}
std::fs::write(path, &token)
.with_context(|| format!("Failed to save admin token to {}", path.display()))?;
info!("Admin token is saved to {}", path.display());
}

Ok(token)
Expand Down
20 changes: 17 additions & 3 deletions src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ pub async fn start_interruptable(opts: CliOpts, config: Config) -> anyhow::Resul
let start_time = chrono::Utc::now();
let mut terminate = signal(SignalKind::terminate())?;
let (shutdown_send, mut shutdown_recv) = mpsc::channel(1);

let result = tokio::select! {
ret = start(start_time, opts, config, shutdown_send) => ret,
_ = ctrl_c() => {
Expand Down Expand Up @@ -375,6 +374,7 @@ fn maybe_start_rpc_service(
chain_follower: &ChainFollower<DbType>,
start_time: chrono::DateTime<chrono::Utc>,
shutdown: mpsc::Sender<()>,
rpc_stop_handle: jsonrpsee::server::StopHandle,
ctx: &AppContext,
) -> anyhow::Result<()> {
if config.client.enable_rpc {
Expand Down Expand Up @@ -402,6 +402,12 @@ fn maybe_start_rpc_service(
let snapshot_progress_tracker = ctx.snapshot_progress_tracker.clone();
let msgs_in_tipset = Arc::new(crate::chain::MsgsInTipsetCache::default());
async move {
let rpc_listener = tokio::net::TcpListener::bind(rpc_address)
.await
.map_err(|e| {
anyhow::anyhow!("Unable to listen on RPC endpoint {rpc_address}: {e}")
})
.unwrap();
start_rpc(
RPCState {
state_manager,
Expand All @@ -417,7 +423,8 @@ fn maybe_start_rpc_service(
tipset_send,
snapshot_progress_tracker,
},
rpc_address,
rpc_listener,
rpc_stop_handle,
filter_list,
)
.await
Expand Down Expand Up @@ -559,11 +566,16 @@ pub(super) async fn start(
});
}
loop {
let (rpc_stop_handle, rpc_server_handle) = jsonrpsee::server::stop_channel();
tokio::select! {
_ = snap_gc_reboot_rx.recv_async() => {
// gracefully shutdown RPC server
if let Err(e) = rpc_server_handle.stop() {
tracing::warn!("failed to stop RPC server: {e}");
}
snap_gc.cleanup_before_reboot().await;
}
result = start_services(start_time, &opts, config.clone(), shutdown_send.clone(), |ctx, sync_status| {
result = start_services(start_time, &opts, config.clone(), shutdown_send.clone(), rpc_stop_handle, |ctx, sync_status| {
snap_gc.set_db(ctx.db.clone());
snap_gc.set_sync_status(sync_status);
snap_gc.set_car_db_head_epoch(ctx.db.heaviest_tipset().map(|ts|ts.epoch()).unwrap_or_default());
Expand All @@ -579,6 +591,7 @@ pub(super) async fn start_services(
opts: &CliOpts,
mut config: Config,
shutdown_send: mpsc::Sender<()>,
rpc_stop_handle: jsonrpsee::server::StopHandle,
on_app_context_and_db_initialized: impl FnOnce(&AppContext, SyncStatus),
) -> anyhow::Result<()> {
// Cleanup the collector prometheus metrics registry on start
Expand Down Expand Up @@ -608,6 +621,7 @@ pub(super) async fn start_services(
&chain_follower,
start_time,
shutdown_send.clone(),
rpc_stop_handle,
&ctx,
)?;

Expand Down
24 changes: 19 additions & 5 deletions src/rpc/methods/auth.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
// Copyright 2019-2026 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use crate::auth::*;
use crate::lotus_json::lotus_json_with_self;
use crate::rpc::{ApiPaths, Ctx, Permission, RpcMethod, ServerError};
use crate::{
KeyStore,
auth::*,
lotus_json::lotus_json_with_self,
rpc::{ApiPaths, Ctx, Permission, RpcMethod, ServerError},
};
use anyhow::Result;
use chrono::Duration;
use enumflags2::BitFlags;
Expand All @@ -14,6 +17,18 @@ use serde_with::{DurationSeconds, serde_as};

/// RPC call to create a new JWT Token
pub enum AuthNew {}

impl AuthNew {
pub fn create_token(
keystore: &KeyStore,
token_exp: Duration,
permissions: Vec<String>,
) -> anyhow::Result<String> {
let ki = keystore.get(JWT_IDENTIFIER)?;
Ok(create_token(permissions, ki.private_key(), token_exp)?)
}
}

impl RpcMethod<2> for AuthNew {
const NAME: &'static str = "Filecoin.AuthNew";
const N_REQUIRED_PARAMS: usize = 1;
Expand All @@ -28,14 +43,13 @@ impl RpcMethod<2> for AuthNew {
(permissions, expiration_secs): Self::Params,
) -> Result<Self::Ok, ServerError> {
let ks = ctx.keystore.read();
let ki = ks.get(JWT_IDENTIFIER)?;
// Lotus admin tokens do not expire but Forest requires all JWT tokens to
// have an expiration date. So we set the expiration date to 100 years in
// the future to match user-visible behavior of Lotus.
let token_exp = expiration_secs
.map(chrono::Duration::seconds)
.unwrap_or_else(|| chrono::Duration::days(365 * 100));
let token = create_token(permissions, ki.private_key(), token_exp)?;
let token = Self::create_token(&ks, token_exp, permissions)?;
Ok(token.as_bytes().to_vec())
}
}
Expand Down
75 changes: 66 additions & 9 deletions src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,11 +441,10 @@ use fvm_ipld_blockstore::Blockstore;
use jsonrpsee::{
Methods,
core::middleware::RpcServiceBuilder,
server::{RpcModule, Server, StopHandle, TowerServiceBuilder, stop_channel},
server::{RpcModule, Server, StopHandle, TowerServiceBuilder},
};
use parking_lot::RwLock;
use std::env;
use std::net::SocketAddr;
use std::sync::{Arc, LazyLock};
use std::time::Duration;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -527,7 +526,8 @@ struct PerConnection<RpcMiddleware, HttpMiddleware> {

pub async fn start_rpc<DB>(
state: RPCState<DB>,
rpc_endpoint: SocketAddr,
rpc_listener: tokio::net::TcpListener,
stop_handle: StopHandle,
filter_list: Option<FilterList>,
) -> anyhow::Result<()>
where
Expand All @@ -554,8 +554,6 @@ where
let methods: Arc<HashMap<ApiPaths, Methods>> =
Arc::new(modules.into_iter().map(|(k, v)| (k, v.into())).collect());

let (stop_handle, _server_handle) = stop_channel();

let per_conn = PerConnection {
stop_handle: stop_handle.clone(),
svc_builder: Server::builder()
Expand All @@ -578,12 +576,10 @@ where
.to_service_builder(),
keystore,
};

let listener = tokio::net::TcpListener::bind(rpc_endpoint).await.unwrap();
tracing::info!("Ready for RPC connections");
loop {
let sock = tokio::select! {
res = listener.accept() => {
res = rpc_listener.accept() => {
match res {
Ok((stream, _remote_addr)) => stream,
Err(e) => {
Expand Down Expand Up @@ -786,7 +782,13 @@ pub fn openrpc(path: ApiPaths, include: Option<&[&str]>) -> openrpc_types::OpenR

#[cfg(test)]
mod tests {
use crate::rpc::ApiPaths;
use super::*;
use crate::{
db::MemoryDB, networks::NetworkChain, rpc::common::ShiftingVersion,
tool::offline_server::server::offline_rpc_state,
};
use jsonrpsee::server::stop_channel;
use std::net::{Ipv4Addr, SocketAddr};

// `cargo test --lib -- --exact 'rpc::tests::openrpc'`
// `cargo insta review`
Expand All @@ -805,4 +807,59 @@ mod tests {
insta::assert_yaml_snapshot!(_spec);
}
}

#[tokio::test(flavor = "multi_thread")]
async fn test_rpc_server() {
let chain = NetworkChain::Calibnet;
let db = Arc::new(MemoryDB::default());
let (state, mut shutdown_recv) = offline_rpc_state(chain, db, None, None).await.unwrap();
let block_delay_secs = state.chain_config().block_delay_secs;
let shutdown_send = state.shutdown.clone();
let jwt_read_permissions = vec!["read".to_owned()];
let jwt_read = super::methods::auth::AuthNew::create_token(
&state.keystore.read(),
chrono::Duration::hours(1),
jwt_read_permissions.clone(),
)
.unwrap();
let rpc_listener =
tokio::net::TcpListener::bind(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0))
.await
.unwrap();
let rpc_address = rpc_listener.local_addr().unwrap();
let (stop_handle, server_handle) = stop_channel();

// Start an RPC server

let handle = tokio::spawn(start_rpc(state, rpc_listener, stop_handle, None));

// Send a few requests

let client = Client::from_url(
format!("http://{}:{}/", rpc_address.ip(), rpc_address.port())
.parse()
.unwrap(),
);

let response = super::methods::common::Version::call(&client, ())
.await
.unwrap();
assert_eq!(
&response.version,
&*crate::utils::version::FOREST_VERSION_STRING
);
assert_eq!(response.block_delay, block_delay_secs);
assert_eq!(response.api_version, ShiftingVersion::new(2, 3, 0));

let response = super::methods::auth::AuthVerify::call(&client, (jwt_read,))
.await
.unwrap();
assert_eq!(response, jwt_read_permissions);

// Gracefully shutdown the RPC server
shutdown_send.send(()).await.unwrap();
shutdown_recv.recv().await;
server_handle.stop().unwrap();
handle.await.unwrap().unwrap();
}
}
2 changes: 1 addition & 1 deletion src/tool/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Copyright 2019-2026 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT
pub mod main;
mod offline_server;
pub mod offline_server;
pub mod subcommands;
Loading
Loading